首頁»Java WEB»使用Java處理大文件

使用Java處理大文件

來源:importnew 發布時間:2014-04-10 閱讀次數:

  我最近要處理一套存儲歷史實時數據的大文件fx market data,我很快便意識到,使用傳統的InputStream不能夠將它們讀取到內存,因為每一個文件都超過了4G。甚至編輯器都不能夠打開這些文件。

  在這種特殊情況下,我可以寫一個簡單的bash腳本將這些文件分成更小的文件塊,然后再讀取它。但是我不想這樣做,因為二進制格式會使這個方法失效。

  處理這個問題的方式通常就是使用內存映射文件遞增地處理區域的數據。關于內存映射文件的一個好處就是它們不會使用虛擬內存和換頁空間,因為它們是從磁盤上的文件返回來的數據。

  很好,讓我們來看一看這些文件和額外的一些數據。似乎它們使用逗號分隔的字段包含ASCII文本行。

  格式:[currency-pair],[timestamp],[bid-price],[ask-price]

  例子:EUR/USD,20120102 00:01:30.420,1.29451,1.2949

  我可以為這種格式去寫一個程序,但是,讀取文件和解析文件是無關的概念。讓我們退一步來想一個通用的設計,當在將來面臨相似的問題時這個設計可以被重復利用。

  這個問題可以歸結為遞增地解碼一個已經在無限長的數組中被編碼的記錄,并且沒有耗盡內存。實際上,以逗號分割的示例格式編碼與通常的解決方案是不相關的。所以,很明顯需要一個解碼器來處理不同的格式。

  再來看,知道整個文件處理完成,每一條記錄都不能被解析并保存在內存中,所以我們需要一種方式來轉移記錄,在它們成為垃圾被回收之前可以被寫到其他地方,例如磁盤或者網絡。

  迭代器是處理這個需求的很好的抽象,因為它們就像游標一樣,可以正確的指向某個位置。每一次迭代都可以轉發文件指針,并且可以讓我們使用數據做其他的事情。

  首先來寫一個Decoder 接口,遞增地把對象從MappedByteBuffer中解碼,如果buffer中沒有對象,則返回null。

public interface Decoder<T> {
    public T decode(ByteBuffer buffer);
}

  然后讓FileReader 實現Iterable接口。每一個迭代器將會處理下一個4096字節的數據,并使用Decoder把它們解碼成一個對象的List集合。注意,FileReader 接收文件(files)的list對象,這樣是很好的,因為它可以遍歷數據,并且不需要考慮聚合的問題。順便說一下,4096個字節塊對于大文件來說是非常小的。

public class FileReader implements Iterable<List<T>> {
  private static final long CHUNK_SIZE = 4096;
  private final Decoder<T> decoder;
  private Iterator<File> files;
 
  private FileReader(Decoder<T> decoder, File... files) {
    this(decoder, Arrays.asList(files));
  }
  private FileReader(Decoder<T> decoder, List<File> files) {
    this.files = files.iterator();
    this.decoder = decoder;
  }
  public static <T> FileReader<T> create(Decoder<T> decoder, List<File> files) {
    return new FileReader<T>(decoder, files);
  }
 
  public static <T> FileReader<T> create(Decoder<T> decoder, File... files) {
    return new FileReader<T>(decoder, files);
  }
  @Override
  public Iterator<List<T>> iterator() {
    return new Iterator<List<T>>() {
      private List<T> entries;
      private long chunkPos = 0;
      private MappedByteBuffer buffer;
      private FileChannel channel;
      @Override
      public boolean hasNext() {
        if (buffer == null || !buffer.hasRemaining()) {
          buffer = nextBuffer(chunkPos);
          if (buffer == null) {
            return false;
          }
        }
        T result = null;
        while ((result = decoder.decode(buffer)) != null) {
          if (entries == null) {
            entries = new ArrayList<T>();
          }
          entries.add(result);
        }
        // set next MappedByteBuffer chunk
        chunkPos += buffer.position();
        buffer = null;
        if (entries != null) {
          return true;
        } else {
          Closeables.closeQuietly(channel);
          return false;
        }
      }
 
      private MappedByteBuffer nextBuffer(long position) {
        try {
          if (channel == null || channel.size() == position) {
            if (channel != null) {
              Closeables.closeQuietly(channel);
              channel = null;
            }
            if (files.hasNext()) {
              File file = files.next();
              channel = new RandomAccessFile(file, "r").getChannel();
              chunkPos = 0;
              position = 0;
            } else {
              return null;
            }
          }
          long chunkSize = CHUNK_SIZE;
          if (channel.size() - position < chunkSize) {
            chunkSize = channel.size() - position;
          }
           return channel.map(FileChannel.MapMode.READ_ONLY, chunkPos, chunkSize);
        } catch (IOException e) {
           Closeables.closeQuietly(channel);
           throw new RuntimeException(e);
        }
      }
 
      @Override
      public List<T> next() {
        List<T> res = entries;
        entries = null;
        return res;
      }
 
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }
    };
  }
}

  下一個任務就是寫一個Decoder 。針對逗號分隔的任何文本格式,編寫一個TextRowDecoder 類。接收的參數是每行字段的數量和一個字段分隔符,返回byte的二維數組。TextRowDecoder 可以被操作不同字符集的特定格式解碼器重復利用。

public class TextRowDecoder implements Decoder<byte[][]> {
  private static final byte LF = 10;
  private final int numFields;
  private final byte delimiter;
  public TextRowDecoder(int numFields, byte delimiter) {
   this.numFields = numFields;
   this.delimiter = delimiter;
  }
  @Override
  public byte[][] decode(ByteBuffer buffer) {
    int lineStartPos = buffer.position();
    int limit = buffer.limit();
    while (buffer.hasRemaining()) {
      byte b = buffer.get();
      if (b == LF) { // reached line feed so parse line
        int lineEndPos = buffer.position();
        // set positions for one row duplication
        if (buffer.limit() < lineEndPos + 1) {
          buffer.position(lineStartPos).limit(lineEndPos);
        } else {
          buffer.position(lineStartPos).limit(lineEndPos + 1);
        }
        byte[][] entry = parseRow(buffer.duplicate());
        if (entry != null) {
          // reset main buffer
          buffer.position(lineEndPos);
          buffer.limit(limit);
          // set start after LF
          lineStartPos = lineEndPos;
        }
        return entry;
      }
    }
    buffer.position(lineStartPos);
    return null;
  }
 
  public byte[][] parseRow(ByteBuffer buffer) {
    int fieldStartPos = buffer.position();
    int fieldEndPos = 0;
    int fieldNumber = 0;
    byte[][] fields = new byte[numFields][];
    while (buffer.hasRemaining()) {
      byte b = buffer.get();
      if (b == delimiter || b == LF) {
        fieldEndPos = buffer.position();
        // save limit
        int limit = buffer.limit();
        // set positions for one row duplication
        buffer.position(fieldStartPos).limit(fieldEndPos);
        fields[fieldNumber] = parseField(buffer.duplicate(), fieldNumber, fieldEndPos - fieldStartPos - 1);
        fieldNumber++;
        // reset main buffer
        buffer.position(fieldEndPos);
        buffer.limit(limit);
        // set start after LF
        fieldStartPos = fieldEndPos;
      }
      if (fieldNumber == numFields) {
        return fields;
      }
    }
    return null;
  }
 
  private byte[] parseField(ByteBuffer buffer, int pos, int length) {
    byte[] field = new byte[length];
    for (int i = 0; i < field.length; i++) {
      field[i] = buffer.get();
    }
    return field;
  }
}

  這是文件被處理的過程。每一個List包含的元素都從一個單獨的buffer中解碼,每一個元素都是被TextRowDecoder定義的byte二維數組。

TextRowDecoder decoder = new TextRowDecoder(4, comma);
FileReader<byte[][]> reader = FileReader.create(decoder, file.listFiles());
for (List<byte[][]> chunk : reader) {
  // do something with each chunk
}

  我們可以在這里打住,不過還有額外的需求。每一行都包含一個時間戳,每一批都必須分組,使用時間段來代替buffers,如按照天分組、或者按照小時分組。我還想要遍歷每一批的數據,因此,第一反應就是,為FileReader創建一個Iterable包裝器,實現它的行為。一個額外的細節,每一個元素必須通過實現Timestamped接口(這里沒有顯示)提供時間戳到PeriodEntries。

  public class PeriodEntries<T extends Timestamped> implements Iterable<List<T>> {
  private final Iterator<List<T extends Timestamped>> entriesIt;
  private final long interval;
  private PeriodEntries(Iterable<List<T>> entriesIt, long interval) {
    this.entriesIt = entriesIt.iterator();
    this.interval = interval;
  }
 
  public static <T extends Timestamped> PeriodEntries<T> create(Iterable<List<T>> entriesIt, long interval) {
   return new PeriodEntries<T>(entriesIt, interval);
  }
 
  @Override
  public Iterator<List<T extends Timestamped>> iterator() {
    return new Iterator<List<T>>() {
      private Queue<List<T>> queue = new LinkedList<List<T>>();
      private long previous;
      private Iterator<T> entryIt;
 
      @Override
      public boolean hasNext() {
        if (!advanceEntries()) {
          return false;
        }
        T entry =  entryIt.next();
        long time = normalizeInterval(entry);
        if (previous == 0) {
          previous = time;
        }
        if (queue.peek() == null) {
          List<T> group = new ArrayList<T>();
          queue.add(group);
        }
        while (previous == time) {
          queue.peek().add(entry);
          if (!advanceEntries()) {
            break;
          }
          entry = entryIt.next();
          time = normalizeInterval(entry);
        }
        previous = time;
        List<T> result = queue.peek();
        if (result == null || result.isEmpty()) {
          return false;
        }
        return true;
      }
 
      private boolean advanceEntries() {
        // if there are no rows left
        if (entryIt == null || !entryIt.hasNext()) {
          // try get more rows if possible
          if (entriesIt.hasNext()) {
            entryIt = entriesIt.next().iterator();
            return true;
          } else {
            // no more rows
            return false;
          }
        }
        return true;
      }
 
      private long normalizeInterval(Timestamped entry) {
        long time = entry.getTime();
        int utcOffset = TimeZone.getDefault().getOffset(time);
        long utcTime = time + utcOffset;
        long elapsed = utcTime % interval;
        return time - elapsed;
      }
      @Override
      public List<T> next() {
        return queue.poll();
      }
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }
   };
  }
}

  最后的處理代碼通過引入這個函數并無太大變動,只有一個干凈的且緊密的循環,不必關心文件、緩沖區、時間周期的分組元素。PeriodEntries也是足夠的靈活管理任何時長的時間。

TrueFxDecoder decoder = new TrueFxDecoder();
FileReader<TrueFxData> reader = FileReader.create(decoder, file.listFiles());
long periodLength = TimeUnit.DAYS.toMillis(1);
PeriodEntries<TrueFxData> periods = PeriodEntries.create(reader, periodLength);
 
for (List<TrueFxData> entries : periods) {
   // data for each day
   for (TrueFxData entry : entries) {
     // process each entry
   }
}

  你也許意識到了,使用集合不可能解決這樣的問題;選擇迭代器是一個關鍵的設計決策,能夠解析兆字節的數組,且不會消耗過多的空間。

  原文鏈接: javacodegeeks 翻譯: ImportNew - 踏雁尋花

QQ群:WEB開發者官方群(515171538),驗證消息:10000
微信群:加小編微信 849023636 邀請您加入,驗證消息:10000
提示:更多精彩內容關注微信公眾號:全棧開發者中心(fsder-com)
網友評論(共0條評論) 正在載入評論......
理智評論文明上網,拒絕惡意謾罵 發表評論 / 共0條評論
登錄會員中心
大神带着买彩票 沽源县| 疏附县| 新疆| 巴南区| 平乡县| 城市| 安国市| 关岭| 抚顺市| 伊吾县| 天祝| 平阳县| 屏南县| 阳朔县| 额尔古纳市| 鸡东县| 察雅县| 海兴县| 新沂市| 天台县| 岫岩| 崇文区| 汪清县| 石屏县| 海城市| 鹿邑县| 三原县| 宣城市| 行唐县| 阳西县| 阿拉尔市| 会东县| 永州市| 洛宁县| 荃湾区| 崇阳县| 揭阳市| 遂昌县|