利用线程池解析大文件

  1. ReadFileThread

      public class ReadFileThread extends Thread {
    
     private ReaderFileListener processPoiDataListeners;
     private String filePath;
     private long start;
     private long end;
    
     public ReadFileThread(ReaderFileListener processPoiDataListeners,long start,long end,String file) {
         this.setName(this.getName()+"-ReadFileThread");
         System.out.println(this.getName());
         this.start = start;
         this.end = end;
         this.filePath = file;
         this.processPoiDataListeners = processPoiDataListeners;
     }
    
     @Override
     public void run() {
         ReadFile readFile = new ReadFile();
         readFile.setReaderListener(processPoiDataListeners);
         readFile.setEncode(processPoiDataListeners.getEncode());
     //  readFile.addObserver();
         try {
             readFile.readFileByLine(filePath, start, end + 1);
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
    

    }

  1. ReadFile

       import java.io.*;
     import java.nio.ByteBuffer;
     import java.nio.channels.FileChannel;
     import java.util.Observable;
    
     /**
      * @author: LiMing
      * @since: 2022/5/27 17:43
      **/
     public class ReadFile extends Observable {
    
     private int bufSize = 1024;
     // 换行符
     private byte key = "\n".getBytes()[0];
     // 当前行数
     private long lineNum = 0;
     // 文件编码,默认为gb2312
     private String encode = "gb2312";
     // 具体业务逻辑监听器
     private ReaderFileListener readerListener;
    
     public void setEncode(String encode) {
         this.encode = encode;
     }
    
     public void setReaderListener(ReaderFileListener readerListener) {
         this.readerListener = readerListener;
     }
    
     /**
      * 获取准确开始位置
      * @param file
      * @param position
      * @return
      * @throws Exception
      */
     public long getStartNum(File file, long position) throws Exception {
         long startNum = position;
         FileChannel fcin = new RandomAccessFile(file, "r").getChannel();
         fcin.position(position);
         try {
             int cache = 1024;
             ByteBuffer rBuffer = ByteBuffer.allocate(cache);
             // 每次读取的内容
             byte[] bs = new byte[cache];
             // 缓存
             byte[] tempBs = new byte[0];
             String line = "";
             while (fcin.read(rBuffer) != -1) {
                 int rSize = rBuffer.position();
                 rBuffer.rewind();
                 rBuffer.get(bs);
                 rBuffer.clear();
                 byte[] newStrByte = bs;
                 // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
                 if (null != tempBs) {
                     int tL = tempBs.length;
                     newStrByte = new byte[rSize + tL];
                     System.arraycopy(tempBs, 0, newStrByte, 0, tL);
                     System.arraycopy(bs, 0, newStrByte, tL, rSize);
                 }
                 // 获取开始位置之后的第一个换行符
                 int endIndex = indexOf(newStrByte, 0);
                 if (endIndex != -1) {
                     return startNum + endIndex;
                 }
                 tempBs = substring(newStrByte, 0, newStrByte.length);
                 startNum += 1024;
             }
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             fcin.close();
         }
         return position;
     }
    
     /**
      * 从设置的开始位置读取文件,一直到结束为止。如果 end设置为负数,刚读取到文件末尾
      * @param fullPath
      * @param start
      * @param end
      * @throws Exception
      */
     public void readFileByLine(String fullPath, long start, long end) throws Exception {
         File fin = new File(fullPath);
         if (fin.exists()) {
             FileChannel fcin = new RandomAccessFile(fin, "r").getChannel();
             fcin.position(start);
             try {
                 ByteBuffer rBuffer = ByteBuffer.allocate(bufSize);
                 // 每次读取的内容
                 byte[] bs = new byte[bufSize];
                 // 缓存
                 byte[] tempBs = new byte[0];
                 String line = "";
                 // 当前读取文件位置
                 long nowCur = start;
                 while (fcin.read(rBuffer) != -1) {
                     nowCur += bufSize;
    
                     int rSize = rBuffer.position();
                     rBuffer.rewind();
                     rBuffer.get(bs);
                     rBuffer.clear();
                     byte[] newStrByte = bs;
                     // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
                     if (null != tempBs) {
                         int tL = tempBs.length;
                         newStrByte = new byte[rSize + tL];
                         System.arraycopy(tempBs, 0, newStrByte, 0, tL);
                         System.arraycopy(bs, 0, newStrByte, tL, rSize);
                     }
                     // 是否已经读到最后一位
                     boolean isEnd = false;
                     // 如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置
                     if (end > 0 && nowCur > end) {
                         // 缓存长度 - 当前已经读取位数 - 最后位数
                         int l = newStrByte.length - (int) (nowCur - end);
                         newStrByte = substring(newStrByte, 0, l);
                         isEnd = true;
                     }
                     int fromIndex = 0;
                     int endIndex = 0;
                     // 每次读一行内容,以 key(默认为\n) 作为结束符
                     while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) {
                         byte[] bLine = substring(newStrByte, fromIndex, endIndex);
                         line = new String(bLine, 0, bLine.length, encode);
                         lineNum++;
                         // 输出一行内容,处理方式由调用方提供
                         readerListener.outLine(line.trim(), lineNum, false);
                         fromIndex = endIndex + 1;
                     }
                     // 将未读取完成的内容放到缓存中
                     tempBs = substring(newStrByte, fromIndex, newStrByte.length);
                     if (isEnd) {
                         break;
                     }
                 }
                 // 将剩下的最后内容作为一行,输出,并指明这是最后一行
                 String lineStr = new String(tempBs, 0, tempBs.length, encode);
                 readerListener.outLine(lineStr.trim(), lineNum, true);
             } catch (Exception e) {
                 e.printStackTrace();
             } finally {
                 fcin.close();
             }
    
         } else {
             throw new FileNotFoundException("没有找到文件:" + fullPath);
         }
         // 通知观察者,当前工作已经完成
         setChanged();
         notifyObservers(start+"-"+end);
     }
    
     /**
      * 查找一个byte[]从指定位置之后的一个换行符位置
      *
      * @param src
      * @param fromIndex
      * @return
      * @throws Exception
      */
     private int indexOf(byte[] src, int fromIndex) throws Exception {
    
         for (int i = fromIndex; i < src.length; i++) {
             if (src[i] == key) {
                 return i;
             }
         }
         return -1;
     }
    
     /**
      * 从指定开始位置读取一个byte[]直到指定结束位置为止生成一个全新的byte[]
      *
      * @param src
      * @param fromIndex
      * @param endIndex
      * @return
      * @throws Exception
      */
     private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception {
         int size = endIndex - fromIndex;
         byte[] ret = new byte[size];
         System.arraycopy(src, fromIndex, ret, 0, size);
         return ret;
     }
    
     }
    
  1. ReaderFileListener:

     // 一次读取行数,默认为500
     private int readColNum = 500;
    
     private String encode;
    
     private List<String> list = new ArrayList<String>();
    
     /**
      * 设置一次读取行数
      * @param readColNum
      */
     protected void setReadColNum(int readColNum) {
         this.readColNum = readColNum;
     }
    
     public String getEncode() {
         return encode;
     }
    
     public void setEncode(String encode) {
         this.encode = encode;
     }
    
     /**
      * 每读取到一行数据,添加到缓存中
      * @param lineStr 读取到的数据
      * @param lineNum 行号
      * @param over 是否读取完成
      * @throws Exception
      */
     public void outLine(String lineStr, long lineNum, boolean over) throws Exception {
         if(null != lineStr)
             list.add(lineStr);
         if (!over && (lineNum % readColNum == 0)) {
             output(list);
             list.clear();
         } else if (over) {
             output(list);
             list.clear();
         }
     }
    
     /**
      * 批量输出
      *
      * @param stringList
      * @throws Exception
      */
     public abstract void output(List<String> stringList) throws Exception;
    
  2. ProcessDataByPostgisListeners

     import java.util.List;
    
     /**
      * @author: LiMing
      * @since: 2022/5/30 17:51
      **/
     public class ProcessDataByPostgisListeners extends ReaderFileListener {
         @Override
         public void output(List<String> stringList) throws Exception {
             for (String s : stringList) {
                 System.out.println(s);
             }
         }
     }
    
  3. BuildData

     import java.io.File;
     import java.io.FileInputStream;
     import java.io.IOException;
     import java.util.concurrent.SynchronousQueue;
     import java.util.concurrent.ThreadPoolExecutor;
     import java.util.concurrent.TimeUnit;
    
     public class BuildData {
     public static void main(String[] args) throws Exception {
     File file = new File("D:\\Users\\TomorrowLi\\Desktop\\log文件\\92020701_out.log.2022-05-23.16.40.15");
     FileInputStream fis = null;
     try {
         ReadFile readFile = new ReadFile();
         fis = new FileInputStream(file);
         int available = fis.available();
         int maxThreadNum = 5;
    
         //核心线程
         int corePoolSize=5;
         //最大线程数
         int maxnumPoolSize=10;
         //等待时间
         long keepAliveTime=1;
    
         // 线程粗略开始位置
         int i = available / maxThreadNum;
    
         ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maxnumPoolSize,keepAliveTime, TimeUnit.MINUTES,new SynchronousQueue<>());
    
         for (int j = 0; j < maxThreadNum; j++) {
             // 计算精确开始位置
             long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j);
             long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2;
             // 具体监听实现
             ReaderFileListener listeners = new ProcessDataByPostgisListeners();
             listeners.setEncode("UTF-8");
             threadPoolExecutor.submit(new ReadFileThread(listeners, startNum, endNum, file.getPath()));
         }
    
         threadPoolExecutor.shutdown();
     } catch (IOException e) {
         e.printStackTrace();
     } catch (Exception e) {
         e.printStackTrace();
     }
     }
      }
    
-------------本文结束感谢您的阅读-------------
坚持原创技术分享,您的支持将鼓励我继续创作!