Re: svn commit: r966168 [1/4] - in /lucene/dev: branches/realtime_search/lucene/src/java/org/apache/lucene/index/ branches/realtime_search/lucene/src/java/org/apache/lucene/util/ branches/realtime_search/lucene/src/test/org/apache/lucene/index/ trunk/lucen...

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: svn commit: r966168 [1/4] - in /lucene/dev: branches/realtime_search/lucene/src/java/org/apache/lucene/index/ branches/realtime_search/lucene/src/java/org/apache/lucene/util/ branches/realtime_search/lucene/src/test/org/apache/lucene/index/ trunk/lucen...

Michael Busch
  Oups I messed something up.  Two files ended up in trunk (something
went wrong with my svn switch) :(

Will try to fix it now...

  Michael

On 7/21/10 3:27 AM, [hidden email] wrote:

> Author: buschmi
> Date: Wed Jul 21 10:27:20 2010
> New Revision: 966168
>
> URL: http://svn.apache.org/viewvc?rev=966168&view=rev
> Log:
> LUCENE-2324: Committing second version of the patch to the real-time branch.  It's not done yet, but easier to track progress using the branch.
>
> Added:
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/util/ThreadSafeCloneableSortedMap.java
> Removed:
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumerPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxFieldMergeState.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumerPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumerPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumerPerThread.java
>      lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashPerThread.java
> Modified:
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumer.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestByteSlices.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java
>      lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
>      lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,70 @@
> +package org.apache.lucene.index;
> +
> +import java.util.TreeMap;
> +
> +import org.apache.lucene.search.Query;
> +import org.apache.lucene.util.ThreadSafeCloneableSortedMap;
> +
> +public class BufferedDeletesInRAM {
> +  static class Delete {
> +    int flushCount;
> +
> +    public Delete(int flushCount) {
> +      this.flushCount = flushCount;
> +    }
> +  }
> +
> +  final static class DeleteTerm extends Delete {
> +    final Term term;
> +
> +    public DeleteTerm(Term term, int flushCount) {
> +      super(flushCount);
> +      this.term = term;
> +    }
> +  }
> +
> +  final static class DeleteTerms extends Delete {
> +    final Term[] terms;
> +
> +    public DeleteTerms(Term[] terms, int flushCount) {
> +      super(flushCount);
> +      this.terms = terms;
> +    }
> +  }
> +
> +  final static class DeleteQuery extends Delete {
> +    final Query query;
> +
> +    public DeleteQuery(Query query, int flushCount) {
> +      super(flushCount);
> +      this.query = query;
> +    }
> +  }
> +
> +  final ThreadSafeCloneableSortedMap<Long, Delete>  deletes = ThreadSafeCloneableSortedMap
> +      .getThreadSafeSortedMap(new TreeMap<Long, Delete>());
> +
> +  final void addDeleteTerm(Term term, long sequenceID, int numThreadStates) {
> +    deletes.put(sequenceID, new DeleteTerm(term, numThreadStates));
> +  }
> +
> +  final void addDeleteTerms(Term[] terms, long sequenceID, int numThreadStates) {
> +    deletes.put(sequenceID, new DeleteTerms(terms, numThreadStates));
> +  }
> +
> +  final void addDeleteQuery(Query query, long sequenceID, int numThreadStates) {
> +    deletes.put(sequenceID, new DeleteQuery(query, numThreadStates));
> +  }
> +
> +  boolean hasDeletes() {
> +    return !deletes.isEmpty();
> +  }
> +
> +  void clear() {
> +    deletes.clear();
> +  }
> +
> +  int getNumDeletes() {
> +    return this.deletes.size();
> +  }
> +}
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java Wed Jul 21 10:27:20 2010
> @@ -50,10 +50,10 @@ final class ByteBlockPool {
>     public byte[][] buffers = new byte[10][];
>
>     int bufferUpto = -1;                        // Which buffer we are upto
> -  public int byteUpto = DocumentsWriter.BYTE_BLOCK_SIZE;             // Where we are in head buffer
> +  public int byteUpto = DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;             // Where we are in head buffer
>
>     public byte[] buffer;                              // Current head buffer
> -  public int byteOffset = -DocumentsWriter.BYTE_BLOCK_SIZE;          // Current head offset
> +  public int byteOffset = -DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;          // Current head offset
>
>     private final Allocator allocator;
>
> @@ -95,11 +95,11 @@ final class ByteBlockPool {
>       bufferUpto++;
>
>       byteUpto = 0;
> -    byteOffset += DocumentsWriter.BYTE_BLOCK_SIZE;
> +    byteOffset += DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
>     }
>
>     public int newSlice(final int size) {
> -    if (byteUpto>  DocumentsWriter.BYTE_BLOCK_SIZE-size)
> +    if (byteUpto>  DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-size)
>         nextBuffer();
>       final int upto = byteUpto;
>       byteUpto += size;
> @@ -123,7 +123,7 @@ final class ByteBlockPool {
>       final int newSize = levelSizeArray[newLevel];
>
>       // Maybe allocate another block
> -    if (byteUpto>  DocumentsWriter.BYTE_BLOCK_SIZE-newSize)
> +    if (byteUpto>  DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-newSize)
>         nextBuffer();
>
>       final int newUpto = byteUpto;
> @@ -151,8 +151,8 @@ final class ByteBlockPool {
>     // Fill in a BytesRef from term's length&  bytes encoded in
>     // byte block
>     final BytesRef setBytesRef(BytesRef term, int textStart) {
> -    final byte[] bytes = term.bytes = buffers[textStart>>  DocumentsWriter.BYTE_BLOCK_SHIFT];
> -    int pos = textStart&  DocumentsWriter.BYTE_BLOCK_MASK;
> +    final byte[] bytes = term.bytes = buffers[textStart>>  DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
> +    int pos = textStart&  DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>       if ((bytes[pos]&  0x80) == 0) {
>         // length is 1 byte
>         term.length = bytes[pos];
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java Wed Jul 21 10:27:20 2010
> @@ -48,16 +48,16 @@ final class ByteSliceReader extends Data
>       this.endIndex = endIndex;
>
>       level = 0;
> -    bufferUpto = startIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
> -    bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
> +    bufferUpto = startIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
> +    bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
>       buffer = pool.buffers[bufferUpto];
> -    upto = startIndex&  DocumentsWriter.BYTE_BLOCK_MASK;
> +    upto = startIndex&  DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>
>       final int firstSize = ByteBlockPool.levelSizeArray[0];
>
>       if (startIndex+firstSize>= endIndex) {
>         // There is only this one slice to read
> -      limit = endIndex&  DocumentsWriter.BYTE_BLOCK_MASK;
> +      limit = endIndex&  DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>       } else
>         limit = upto+firstSize-4;
>     }
> @@ -102,11 +102,11 @@ final class ByteSliceReader extends Data
>       level = ByteBlockPool.nextLevelArray[level];
>       final int newSize = ByteBlockPool.levelSizeArray[level];
>
> -    bufferUpto = nextIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
> -    bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
> +    bufferUpto = nextIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
> +    bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
>
>       buffer = pool.buffers[bufferUpto];
> -    upto = nextIndex&  DocumentsWriter.BYTE_BLOCK_MASK;
> +    upto = nextIndex&  DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>
>       if (nextIndex + newSize>= endIndex) {
>         // We are advancing to the final slice
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java Wed Jul 21 10:27:20 2010
> @@ -42,9 +42,9 @@ final class ByteSliceWriter extends Data
>      * Set up the writer to write at address.
>      */
>     public void init(int address) {
> -    slice = pool.buffers[address>>  DocumentsWriter.BYTE_BLOCK_SHIFT];
> +    slice = pool.buffers[address>>  DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
>       assert slice != null;
> -    upto = address&  DocumentsWriter.BYTE_BLOCK_MASK;
> +    upto = address&  DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>       offset0 = address;
>       assert upto<  slice.length;
>     }
> @@ -80,6 +80,6 @@ final class ByteSliceWriter extends Data
>     }
>
>     public int getAddress() {
> -    return upto + (offset0&  DocumentsWriter.BYTE_BLOCK_NOT_MASK);
> +    return upto + (offset0&  DocumentsWriterRAMAllocator.BYTE_BLOCK_NOT_MASK);
>     }
>   }
> \ No newline at end of file
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java Wed Jul 21 10:27:20 2010
> @@ -18,11 +18,10 @@ package org.apache.lucene.index;
>    */
>
>   import java.io.IOException;
> -import java.util.Collection;
>
>   abstract class DocConsumer {
> -  abstract DocConsumerPerThread addThread(DocumentsWriterThreadState perThread) throws IOException;
> -  abstract void flush(final Collection<DocConsumerPerThread>  threads, final SegmentWriteState state) throws IOException;
> +  abstract DocumentsWriterPerThread.DocWriter processDocument() throws IOException;
> +  abstract void flush(final SegmentWriteState state) throws IOException;
>     abstract void closeDocStore(final SegmentWriteState state) throws IOException;
>     abstract void abort();
>     abstract boolean freeRAM();
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java Wed Jul 21 10:27:20 2010
> @@ -18,7 +18,6 @@ package org.apache.lucene.index;
>    */
>
>   import java.io.IOException;
> -import java.util.Collection;
>   import java.util.Map;
>
>   abstract class DocFieldConsumer {
> @@ -27,7 +26,7 @@ abstract class DocFieldConsumer {
>
>     /** Called when DocumentsWriter decides to create a new
>      *  segment */
> -  abstract void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  threadsAndFields, SegmentWriteState state) throws IOException;
> +  abstract void flush(Map<FieldInfo, DocFieldConsumerPerField>  fieldsToFlush, SegmentWriteState state) throws IOException;
>
>     /** Called when DocumentsWriter decides to close the doc
>      *  stores */
> @@ -36,14 +35,17 @@ abstract class DocFieldConsumer {
>     /** Called when an aborting exception is hit */
>     abstract void abort();
>
> -  /** Add a new thread */
> -  abstract DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException;
> -
>     /** Called when DocumentsWriter is using too much RAM.
>      *  The consumer should free RAM, if possible, returning
>      *  true if any RAM was in fact freed. */
>     abstract boolean freeRAM();
> +
> +  abstract void startDocument() throws IOException;
>
> +  abstract DocFieldConsumerPerField addField(FieldInfo fi);
> +
> +  abstract DocumentsWriterPerThread.DocWriter finishDocument() throws IOException;
> +
>     void setFieldInfos(FieldInfos fieldInfos) {
>       this.fieldInfos = fieldInfos;
>     }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java Wed Jul 21 10:27:20 2010
> @@ -24,4 +24,5 @@ abstract class DocFieldConsumerPerField
>     /** Processes all occurrences of a single field */
>     abstract void processFields(Fieldable[] fields, int count) throws IOException;
>     abstract void abort();
> +  abstract FieldInfo getFieldInfo();
>   }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java Wed Jul 21 10:27:20 2010
> @@ -17,12 +17,9 @@ package org.apache.lucene.index;
>    * limitations under the License.
>    */
>
> +import java.io.IOException;
>   import java.util.HashMap;
> -import java.util.Collection;
> -import java.util.Iterator;
>   import java.util.Map;
> -import java.util.HashSet;
> -import java.io.IOException;
>
>   import org.apache.lucene.util.ArrayUtil;
>   import org.apache.lucene.util.RamUsageEstimator;
> @@ -33,10 +30,12 @@ import org.apache.lucene.util.RamUsageEs
>   final class DocFieldConsumers extends DocFieldConsumer {
>     final DocFieldConsumer one;
>     final DocFieldConsumer two;
> +  final DocumentsWriterPerThread.DocState docState;
>
> -  public DocFieldConsumers(DocFieldConsumer one, DocFieldConsumer two) {
> +  public DocFieldConsumers(DocFieldProcessor processor, DocFieldConsumer one, DocFieldConsumer two) {
>       this.one = one;
>       this.two = two;
> +    this.docState = processor.docState;
>     }
>
>     @Override
> @@ -47,33 +46,19 @@ final class DocFieldConsumers extends Do
>     }
>
>     @Override
> -  public void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  threadsAndFields, SegmentWriteState state) throws IOException {
> -
> -    Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  oneThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
> -    Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  twoThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
> -
> -    for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  entry : threadsAndFields.entrySet()) {
> +  public void flush(Map<FieldInfo, DocFieldConsumerPerField>  fieldsToFlush, SegmentWriteState state) throws IOException {
>
> -      final DocFieldConsumersPerThread perThread = (DocFieldConsumersPerThread) entry.getKey();
> +    Map<FieldInfo, DocFieldConsumerPerField>  oneFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
> +    Map<FieldInfo, DocFieldConsumerPerField>  twoFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
>
> -      final Collection<DocFieldConsumerPerField>  fields = entry.getValue();
> -
> -      Iterator<DocFieldConsumerPerField>  fieldsIt = fields.iterator();
> -      Collection<DocFieldConsumerPerField>  oneFields = new HashSet<DocFieldConsumerPerField>();
> -      Collection<DocFieldConsumerPerField>  twoFields = new HashSet<DocFieldConsumerPerField>();
> -      while(fieldsIt.hasNext()) {
> -        DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldsIt.next();
> -        oneFields.add(perField.one);
> -        twoFields.add(perField.two);
> -      }
> -
> -      oneThreadsAndFields.put(perThread.one, oneFields);
> -      twoThreadsAndFields.put(perThread.two, twoFields);
> +    for (Map.Entry<FieldInfo, DocFieldConsumerPerField>  fieldToFlush : fieldsToFlush.entrySet()) {
> +      DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldToFlush.getValue();
> +      oneFieldsToFlush.put(fieldToFlush.getKey(), perField.one);
> +      twoFieldsToFlush.put(fieldToFlush.getKey(), perField.two);
>       }
> -
>
> -    one.flush(oneThreadsAndFields, state);
> -    two.flush(twoThreadsAndFields, state);
> +    one.flush(oneFieldsToFlush, state);
> +    two.flush(twoFieldsToFlush, state);
>     }
>
>     @Override
> @@ -101,16 +86,11 @@ final class DocFieldConsumers extends Do
>       return any;
>     }
>
> -  @Override
> -  public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException {
> -    return new DocFieldConsumersPerThread(docFieldProcessorPerThread, this, one.addThread(docFieldProcessorPerThread), two.addThread(docFieldProcessorPerThread));
> -  }
> -
>     PerDoc[] docFreeList = new PerDoc[1];
>     int freeCount;
>     int allocCount;
>
> -  synchronized PerDoc getPerDoc() {
> +  PerDoc getPerDoc() {
>       if (freeCount == 0) {
>         allocCount++;
>         if (allocCount>  docFreeList.length) {
> @@ -125,15 +105,15 @@ final class DocFieldConsumers extends Do
>         return docFreeList[--freeCount];
>     }
>
> -  synchronized void freePerDoc(PerDoc perDoc) {
> +  void freePerDoc(PerDoc perDoc) {
>       assert freeCount<  docFreeList.length;
>       docFreeList[freeCount++] = perDoc;
>     }
>
> -  class PerDoc extends DocumentsWriter.DocWriter {
> +  class PerDoc extends DocumentsWriterPerThread.DocWriter {
>
> -    DocumentsWriter.DocWriter writerOne;
> -    DocumentsWriter.DocWriter writerTwo;
> +    DocumentsWriterPerThread.DocWriter writerOne;
> +    DocumentsWriterPerThread.DocWriter writerTwo;
>
>       @Override
>       public long sizeInBytes() {
> @@ -166,4 +146,35 @@ final class DocFieldConsumers extends Do
>         }
>       }
>     }
> +
> +  @Override
> +  public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
> +    final DocumentsWriterPerThread.DocWriter oneDoc = one.finishDocument();
> +    final DocumentsWriterPerThread.DocWriter twoDoc = two.finishDocument();
> +    if (oneDoc == null)
> +      return twoDoc;
> +    else if (twoDoc == null)
> +      return oneDoc;
> +    else {
> +      DocFieldConsumers.PerDoc both = getPerDoc();
> +      both.docID = docState.docID;
> +      assert oneDoc.docID == docState.docID;
> +      assert twoDoc.docID == docState.docID;
> +      both.writerOne = oneDoc;
> +      both.writerTwo = twoDoc;
> +      return both;
> +    }
> +  }
> +
> +  @Override
> +  public void startDocument() throws IOException {
> +    one.startDocument();
> +    two.startDocument();
> +  }
> +
> +  @Override
> +  public DocFieldConsumerPerField addField(FieldInfo fi) {
> +    return new DocFieldConsumersPerField(this, fi, one.addField(fi), two.addField(fi));
> +  }
> +
>   }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java Wed Jul 21 10:27:20 2010
> @@ -24,12 +24,14 @@ final class DocFieldConsumersPerField ex
>
>     final DocFieldConsumerPerField one;
>     final DocFieldConsumerPerField two;
> -  final DocFieldConsumersPerThread perThread;
> +  final DocFieldConsumers parent;
> +  final FieldInfo fieldInfo;
>
> -  public DocFieldConsumersPerField(DocFieldConsumersPerThread perThread, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
> -    this.perThread = perThread;
> +  public DocFieldConsumersPerField(DocFieldConsumers parent, FieldInfo fi, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
> +    this.parent = parent;
>       this.one = one;
>       this.two = two;
> +    this.fieldInfo = fi;
>     }
>
>     @Override
> @@ -46,4 +48,9 @@ final class DocFieldConsumersPerField ex
>         two.abort();
>       }
>     }
> +
> +  @Override
> +  FieldInfo getFieldInfo() {
> +    return fieldInfo;
> +  }
>   }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java Wed Jul 21 10:27:20 2010
> @@ -19,8 +19,15 @@ package org.apache.lucene.index;
>
>   import java.io.IOException;
>   import java.util.Collection;
> -import java.util.Map;
>   import java.util.HashMap;
> +import java.util.HashSet;
> +import java.util.List;
> +import java.util.Map;
> +
> +import org.apache.lucene.document.Document;
> +import org.apache.lucene.document.Fieldable;
> +import org.apache.lucene.util.ArrayUtil;
> +import org.apache.lucene.util.RamUsageEstimator;
>
>
>   /**
> @@ -33,13 +40,27 @@ import java.util.HashMap;
>
>   final class DocFieldProcessor extends DocConsumer {
>
> -  final DocumentsWriter docWriter;
>     final FieldInfos fieldInfos = new FieldInfos();
>     final DocFieldConsumer consumer;
>     final StoredFieldsWriter fieldsWriter;
>
> -  public DocFieldProcessor(DocumentsWriter docWriter, DocFieldConsumer consumer) {
> -    this.docWriter = docWriter;
> +  // Holds all fields seen in current doc
> +  DocFieldProcessorPerField[] fields = new DocFieldProcessorPerField[1];
> +  int fieldCount;
> +
> +  // Hash table for all fields ever seen
> +  DocFieldProcessorPerField[] fieldHash = new DocFieldProcessorPerField[2];
> +  int hashMask = 1;
> +  int totalFieldCount;
> +
> +
> +  float docBoost;
> +  int fieldGen;
> +  final DocumentsWriterPerThread.DocState docState;
> +
> +
> +  public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
> +    this.docState = docWriter.docState;
>       this.consumer = consumer;
>       consumer.setFieldInfos(fieldInfos);
>       fieldsWriter = new StoredFieldsWriter(docWriter, fieldInfos);
> @@ -52,16 +73,17 @@ final class DocFieldProcessor extends Do
>     }
>
>     @Override
> -  public void flush(Collection<DocConsumerPerThread>  threads, SegmentWriteState state) throws IOException {
> +  public void flush(SegmentWriteState state) throws IOException {
>
> -    Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>  childThreadsAndFields = new HashMap<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>();
> -    for ( DocConsumerPerThread thread : threads) {
> -      DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread;
> -      childThreadsAndFields.put(perThread.consumer, perThread.fields());
> -      perThread.trimFields(state);
> +    Map<FieldInfo, DocFieldConsumerPerField>  childFields = new HashMap<FieldInfo, DocFieldConsumerPerField>();
> +    Collection<DocFieldConsumerPerField>  fields = fields();
> +    for (DocFieldConsumerPerField f : fields) {
> +      childFields.put(f.getFieldInfo(), f);
>       }
> +    trimFields(state);
> +
>       fieldsWriter.flush(state);
> -    consumer.flush(childThreadsAndFields, state);
> +    consumer.flush(childFields, state);
>
>       // Important to save after asking consumer to flush so
>       // consumer can alter the FieldInfo* if necessary.  EG,
> @@ -74,6 +96,15 @@ final class DocFieldProcessor extends Do
>
>     @Override
>     public void abort() {
> +    for(int i=0;i<fieldHash.length;i++) {
> +      DocFieldProcessorPerField field = fieldHash[i];
> +      while(field != null) {
> +        final DocFieldProcessorPerField next = field.next;
> +        field.abort();
> +        field = next;
> +      }
> +    }
> +
>       fieldsWriter.abort();
>       consumer.abort();
>     }
> @@ -82,9 +113,317 @@ final class DocFieldProcessor extends Do
>     public boolean freeRAM() {
>       return consumer.freeRAM();
>     }
> +
> +  public Collection<DocFieldConsumerPerField>  fields() {
> +    Collection<DocFieldConsumerPerField>  fields = new HashSet<DocFieldConsumerPerField>();
> +    for(int i=0;i<fieldHash.length;i++) {
> +      DocFieldProcessorPerField field = fieldHash[i];
> +      while(field != null) {
> +        fields.add(field.consumer);
> +        field = field.next;
> +      }
> +    }
> +    assert fields.size() == totalFieldCount;
> +    return fields;
> +  }
> +
> +  /** If there are fields we've seen but did not see again
> +   *  in the last run, then free them up. */
> +
> +  void trimFields(SegmentWriteState state) {
> +
> +    for(int i=0;i<fieldHash.length;i++) {
> +      DocFieldProcessorPerField perField = fieldHash[i];
> +      DocFieldProcessorPerField lastPerField = null;
> +
> +      while (perField != null) {
> +
> +        if (perField.lastGen == -1) {
> +
> +          // This field was not seen since the previous
> +          // flush, so, free up its resources now
> +
> +          // Unhash
> +          if (lastPerField == null)
> +            fieldHash[i] = perField.next;
> +          else
> +            lastPerField.next = perField.next;
> +
> +          if (state.infoStream != null) {
> +            state.infoStream.println("  purge field=" + perField.fieldInfo.name);
> +          }
> +
> +          totalFieldCount--;
> +
> +        } else {
> +          // Reset
> +          perField.lastGen = -1;
> +          lastPerField = perField;
> +        }
> +
> +        perField = perField.next;
> +      }
> +    }
> +  }
> +
> +  private void rehash() {
> +    final int newHashSize = (fieldHash.length*2);
> +    assert newHashSize>  fieldHash.length;
> +
> +    final DocFieldProcessorPerField newHashArray[] = new DocFieldProcessorPerField[newHashSize];
> +
> +    // Rehash
> +    int newHashMask = newHashSize-1;
> +    for(int j=0;j<fieldHash.length;j++) {
> +      DocFieldProcessorPerField fp0 = fieldHash[j];
> +      while(fp0 != null) {
> +        final int hashPos2 = fp0.fieldInfo.name.hashCode()&  newHashMask;
> +        DocFieldProcessorPerField nextFP0 = fp0.next;
> +        fp0.next = newHashArray[hashPos2];
> +        newHashArray[hashPos2] = fp0;
> +        fp0 = nextFP0;
> +      }
> +    }
> +
> +    fieldHash = newHashArray;
> +    hashMask = newHashMask;
> +  }
>
>     @Override
> -  public DocConsumerPerThread addThread(DocumentsWriterThreadState threadState) throws IOException {
> -    return new DocFieldProcessorPerThread(threadState, this);
> +  public DocumentsWriterPerThread.DocWriter processDocument() throws IOException {
> +
> +    consumer.startDocument();
> +    fieldsWriter.startDocument();
> +
> +    final Document doc = docState.doc;
> +
> +    fieldCount = 0;
> +
> +    final int thisFieldGen = fieldGen++;
> +
> +    final List<Fieldable>  docFields = doc.getFields();
> +    final int numDocFields = docFields.size();
> +
> +    // Absorb any new fields first seen in this document.
> +    // Also absorb any changes to fields we had already
> +    // seen before (eg suddenly turning on norms or
> +    // vectors, etc.):
> +
> +    for(int i=0;i<numDocFields;i++) {
> +      Fieldable field = docFields.get(i);
> +      final String fieldName = field.name();
> +
> +      // Make sure we have a PerField allocated
> +      final int hashPos = fieldName.hashCode()&  hashMask;
> +      DocFieldProcessorPerField fp = fieldHash[hashPos];
> +      while(fp != null&&  !fp.fieldInfo.name.equals(fieldName)) {
> +        fp = fp.next;
> +      }
> +
> +      if (fp == null) {
> +
> +        // TODO FI: we need to genericize the "flags" that a
> +        // field holds, and, how these flags are merged; it
> +        // needs to be more "pluggable" such that if I want
> +        // to have a new "thing" my Fields can do, I can
> +        // easily add it
> +        FieldInfo fi = fieldInfos.add(fieldName, field.isIndexed(), field.isTermVectorStored(),
> +                                      field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
> +                                      field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
> +
> +        fp = new DocFieldProcessorPerField(this, fi);
> +        fp.next = fieldHash[hashPos];
> +        fieldHash[hashPos] = fp;
> +        totalFieldCount++;
> +
> +        if (totalFieldCount>= fieldHash.length/2)
> +          rehash();
> +      } else {
> +        fp.fieldInfo.update(field.isIndexed(), field.isTermVectorStored(),
> +                            field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
> +                            field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
> +      }
> +
> +      if (thisFieldGen != fp.lastGen) {
> +
> +        // First time we're seeing this field for this doc
> +        fp.fieldCount = 0;
> +
> +        if (fieldCount == fields.length) {
> +          final int newSize = fields.length*2;
> +          DocFieldProcessorPerField newArray[] = new DocFieldProcessorPerField[newSize];
> +          System.arraycopy(fields, 0, newArray, 0, fieldCount);
> +          fields = newArray;
> +        }
> +
> +        fields[fieldCount++] = fp;
> +        fp.lastGen = thisFieldGen;
> +      }
> +
> +      if (fp.fieldCount == fp.fields.length) {
> +        Fieldable[] newArray = new Fieldable[fp.fields.length*2];
> +        System.arraycopy(fp.fields, 0, newArray, 0, fp.fieldCount);
> +        fp.fields = newArray;
> +      }
> +
> +      fp.fields[fp.fieldCount++] = field;
> +      if (field.isStored()) {
> +        fieldsWriter.addField(field, fp.fieldInfo);
> +      }
> +    }
> +
> +    // If we are writing vectors then we must visit
> +    // fields in sorted order so they are written in
> +    // sorted order.  TODO: we actually only need to
> +    // sort the subset of fields that have vectors
> +    // enabled; we could save [small amount of] CPU
> +    // here.
> +    quickSort(fields, 0, fieldCount-1);
> +
> +    for(int i=0;i<fieldCount;i++)
> +      fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount);
> +
> +    if (docState.maxTermPrefix != null&&  docState.infoStream != null) {
> +      docState.infoStream.println("WARNING: document contains at least one immense term (whose UTF8 encoding is longer than the max length " + DocumentsWriterRAMAllocator.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped.  Please correct the analyzer to not produce such terms.  The prefix of the first immense term is: '" + docState.maxTermPrefix + "...'");
> +      docState.maxTermPrefix = null;
> +    }
> +
> +    final DocumentsWriterPerThread.DocWriter one = fieldsWriter.finishDocument();
> +    final DocumentsWriterPerThread.DocWriter two = consumer.finishDocument();
> +    if (one == null) {
> +      return two;
> +    } else if (two == null) {
> +      return one;
> +    } else {
> +      PerDoc both = getPerDoc();
> +      both.docID = docState.docID;
> +      assert one.docID == docState.docID;
> +      assert two.docID == docState.docID;
> +      both.one = one;
> +      both.two = two;
> +      return both;
> +    }
> +  }
> +
> +  void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) {
> +    if (lo>= hi)
> +      return;
> +    else if (hi == 1+lo) {
> +      if (array[lo].fieldInfo.name.compareTo(array[hi].fieldInfo.name)>  0) {
> +        final DocFieldProcessorPerField tmp = array[lo];
> +        array[lo] = array[hi];
> +        array[hi] = tmp;
> +      }
> +      return;
> +    }
> +
> +    int mid = (lo + hi)>>>  1;
> +
> +    if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name)>  0) {
> +      DocFieldProcessorPerField tmp = array[lo];
> +      array[lo] = array[mid];
> +      array[mid] = tmp;
> +    }
> +
> +    if (array[mid].fieldInfo.name.compareTo(array[hi].fieldInfo.name)>  0) {
> +      DocFieldProcessorPerField tmp = array[mid];
> +      array[mid] = array[hi];
> +      array[hi] = tmp;
> +
> +      if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name)>  0) {
> +        DocFieldProcessorPerField tmp2 = array[lo];
> +        array[lo] = array[mid];
> +        array[mid] = tmp2;
> +      }
> +    }
> +
> +    int left = lo + 1;
> +    int right = hi - 1;
> +
> +    if (left>= right)
> +      return;
> +
> +    DocFieldProcessorPerField partition = array[mid];
> +
> +    for (; ;) {
> +      while (array[right].fieldInfo.name.compareTo(partition.fieldInfo.name)>  0)
> +        --right;
> +
> +      while (left<  right&&  array[left].fieldInfo.name.compareTo(partition.fieldInfo.name)<= 0)
> +        ++left;
> +
> +      if (left<  right) {
> +        DocFieldProcessorPerField tmp = array[left];
> +        array[left] = array[right];
> +        array[right] = tmp;
> +        --right;
> +      } else {
> +        break;
> +      }
> +    }
> +
> +    quickSort(array, lo, left);
> +    quickSort(array, left + 1, hi);
> +  }
> +
> +  PerDoc[] docFreeList = new PerDoc[1];
> +  int freeCount;
> +  int allocCount;
> +
> +  PerDoc getPerDoc() {
> +    if (freeCount == 0) {
> +      allocCount++;
> +      if (allocCount>  docFreeList.length) {
> +        // Grow our free list up front to make sure we have
> +        // enough space to recycle all outstanding PerDoc
> +        // instances
> +        assert allocCount == 1+docFreeList.length;
> +        docFreeList = new PerDoc[ArrayUtil.oversize(allocCount, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
> +      }
> +      return new PerDoc();
> +    } else
> +      return docFreeList[--freeCount];
> +  }
> +
> +  void freePerDoc(PerDoc perDoc) {
> +    assert freeCount<  docFreeList.length;
> +    docFreeList[freeCount++] = perDoc;
> +  }
> +
> +  class PerDoc extends DocumentsWriterPerThread.DocWriter {
> +
> +    DocumentsWriterPerThread.DocWriter one;
> +    DocumentsWriterPerThread.DocWriter two;
> +
> +    @Override
> +    public long sizeInBytes() {
> +      return one.sizeInBytes() + two.sizeInBytes();
> +    }
> +
> +    @Override
> +    public void finish() throws IOException {
> +      try {
> +        try {
> +          one.finish();
> +        } finally {
> +          two.finish();
> +        }
> +      } finally {
> +        freePerDoc(this);
> +      }
> +    }
> +
> +    @Override
> +    public void abort() {
> +      try {
> +        try {
> +          one.abort();
> +        } finally {
> +          two.abort();
> +        }
> +      } finally {
> +        freePerDoc(this);
> +      }
> +    }
>     }
>   }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java Wed Jul 21 10:27:20 2010
> @@ -34,8 +34,8 @@ final class DocFieldProcessorPerField {
>     int fieldCount;
>     Fieldable[] fields = new Fieldable[1];
>
> -  public DocFieldProcessorPerField(final DocFieldProcessorPerThread perThread, final FieldInfo fieldInfo) {
> -    this.consumer = perThread.consumer.addField(fieldInfo);
> +  public DocFieldProcessorPerField(final DocFieldProcessor docFieldProcessor, final FieldInfo fieldInfo) {
> +    this.consumer = docFieldProcessor.consumer.addField(fieldInfo);
>       this.fieldInfo = fieldInfo;
>     }
>
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java Wed Jul 21 10:27:20 2010
> @@ -18,12 +18,13 @@ package org.apache.lucene.index;
>    */
>
>   import java.io.IOException;
> -import java.util.Collection;
>   import java.util.HashMap;
> -import java.util.HashSet;
> -
>   import java.util.Map;
>
> +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
> +import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
> +import org.apache.lucene.util.AttributeSource;
> +
>
>   /** This is a DocFieldConsumer that inverts each field,
>    *  separately, from a Document, and accepts a
> @@ -34,7 +35,32 @@ final class DocInverter extends DocField
>     final InvertedDocConsumer consumer;
>     final InvertedDocEndConsumer endConsumer;
>
> -  public DocInverter(InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
> +  final DocumentsWriterPerThread.DocState docState;
> +
> +  final FieldInvertState fieldState = new FieldInvertState();
> +
> +  final SingleTokenAttributeSource singleToken = new SingleTokenAttributeSource();
> +
> +  static class SingleTokenAttributeSource extends AttributeSource {
> +    final CharTermAttribute termAttribute;
> +    final OffsetAttribute offsetAttribute;
> +
> +    private SingleTokenAttributeSource() {
> +      termAttribute = addAttribute(CharTermAttribute.class);
> +      offsetAttribute = addAttribute(OffsetAttribute.class);
> +    }
> +
> +    public void reinit(String stringValue, int startOffset,  int endOffset) {
> +      termAttribute.setEmpty().append(stringValue);
> +      offsetAttribute.setOffset(startOffset, endOffset);
> +    }
> +  }
> +
> +  // Used to read a string value for a field
> +  final ReusableStringReader stringReader = new ReusableStringReader();
> +
> +  public DocInverter(DocumentsWriterPerThread.DocState docState, InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
> +    this.docState = docState;
>       this.consumer = consumer;
>       this.endConsumer = endConsumer;
>     }
> @@ -47,33 +73,37 @@ final class DocInverter extends DocField
>     }
>
>     @Override
> -  void flush(Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>  threadsAndFields, SegmentWriteState state) throws IOException {
> -
> -    Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>  childThreadsAndFields = new HashMap<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>();
> -    Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>>  endChildThreadsAndFields = new HashMap<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>>();
> -
> -    for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  entry : threadsAndFields.entrySet() ) {
> +  void flush(Map<FieldInfo, DocFieldConsumerPerField>  fieldsToFlush, SegmentWriteState state) throws IOException {
>
> +    Map<FieldInfo, InvertedDocConsumerPerField>  childFieldsToFlush = new HashMap<FieldInfo, InvertedDocConsumerPerField>();
> +    Map<FieldInfo, InvertedDocEndConsumerPerField>  endChildFieldsToFlush = new HashMap<FieldInfo, InvertedDocEndConsumerPerField>();
>
> -      DocInverterPerThread perThread = (DocInverterPerThread) entry.getKey();
> -
> -      Collection<InvertedDocConsumerPerField>  childFields = new HashSet<InvertedDocConsumerPerField>();
> -      Collection<InvertedDocEndConsumerPerField>  endChildFields = new HashSet<InvertedDocEndConsumerPerField>();
> -      for (final DocFieldConsumerPerField field: entry.getValue() ) {
> -        DocInverterPerField perField = (DocInverterPerField) field;
> -        childFields.add(perField.consumer);
> -        endChildFields.add(perField.endConsumer);
> -      }
> -
> -      childThreadsAndFields.put(perThread.consumer, childFields);
> -      endChildThreadsAndFields.put(perThread.endConsumer, endChildFields);
> +    for (Map.Entry<FieldInfo, DocFieldConsumerPerField>  fieldToFlush : fieldsToFlush.entrySet()) {
> +      DocInverterPerField perField = (DocInverterPerField) fieldToFlush.getValue();
> +      childFieldsToFlush.put(fieldToFlush.getKey(), perField.consumer);
> +      endChildFieldsToFlush.put(fieldToFlush.getKey(), perField.endConsumer);
>       }
>
> -    consumer.flush(childThreadsAndFields, state);
> -    endConsumer.flush(endChildThreadsAndFields, state);
> +    consumer.flush(childFieldsToFlush, state);
> +    endConsumer.flush(endChildFieldsToFlush, state);
> +  }
> +
> +  @Override
> +  public void startDocument() throws IOException {
> +    consumer.startDocument();
> +    endConsumer.startDocument();
>     }
>
>     @Override
> +  public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
> +    // TODO: allow endConsumer.finishDocument to also return
> +    // a DocWriter
> +    endConsumer.finishDocument();
> +    return consumer.finishDocument();
> +  }
> +
> +
> +  @Override
>     public void closeDocStore(SegmentWriteState state) throws IOException {
>       consumer.closeDocStore(state);
>       endConsumer.closeDocStore(state);
> @@ -81,17 +111,21 @@ final class DocInverter extends DocField
>
>     @Override
>     void abort() {
> -    consumer.abort();
> -    endConsumer.abort();
> +    try {
> +      consumer.abort();
> +    } finally {
> +      endConsumer.abort();
> +    }
>     }
>
>     @Override
>     public boolean freeRAM() {
>       return consumer.freeRAM();
>     }
> -
> +
>     @Override
> -  public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) {
> -    return new DocInverterPerThread(docFieldProcessorPerThread, this);
> +  public DocFieldConsumerPerField addField(FieldInfo fi) {
> +    return new DocInverterPerField(this, fi);
>     }
> +
>   }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java Wed Jul 21 10:27:20 2010
> @@ -35,20 +35,20 @@ import org.apache.lucene.analysis.tokena
>
>   final class DocInverterPerField extends DocFieldConsumerPerField {
>
> -  final private DocInverterPerThread perThread;
> -  final private FieldInfo fieldInfo;
> +  final private DocInverter parent;
> +  final FieldInfo fieldInfo;
>     final InvertedDocConsumerPerField consumer;
>     final InvertedDocEndConsumerPerField endConsumer;
> -  final DocumentsWriter.DocState docState;
> +  final DocumentsWriterPerThread.DocState docState;
>     final FieldInvertState fieldState;
>
> -  public DocInverterPerField(DocInverterPerThread perThread, FieldInfo fieldInfo) {
> -    this.perThread = perThread;
> +  public DocInverterPerField(DocInverter parent, FieldInfo fieldInfo) {
> +    this.parent = parent;
>       this.fieldInfo = fieldInfo;
> -    docState = perThread.docState;
> -    fieldState = perThread.fieldState;
> -    this.consumer = perThread.consumer.addField(this, fieldInfo);
> -    this.endConsumer = perThread.endConsumer.addField(this, fieldInfo);
> +    docState = parent.docState;
> +    fieldState = parent.fieldState;
> +    this.consumer = parent.consumer.addField(this, fieldInfo);
> +    this.endConsumer = parent.endConsumer.addField(this, fieldInfo);
>     }
>
>     @Override
> @@ -84,8 +84,8 @@ final class DocInverterPerField extends
>           if (!field.isTokenized()) {  // un-tokenized field
>             String stringValue = field.stringValue();
>             final int valueLength = stringValue.length();
> -          perThread.singleToken.reinit(stringValue, 0, valueLength);
> -          fieldState.attributeSource = perThread.singleToken;
> +          parent.singleToken.reinit(stringValue, 0, valueLength);
> +          fieldState.attributeSource = parent.singleToken;
>             consumer.start(field);
>
>             boolean success = false;
> @@ -93,8 +93,9 @@ final class DocInverterPerField extends
>               consumer.add();
>               success = true;
>             } finally {
> -            if (!success)
> +            if (!success) {
>                 docState.docWriter.setAborting();
> +            }
>             }
>             fieldState.offset += valueLength;
>             fieldState.length++;
> @@ -119,8 +120,8 @@ final class DocInverterPerField extends
>                 if (stringValue == null) {
>                   throw new IllegalArgumentException("field must have either TokenStream, String or Reader value");
>                 }
> -              perThread.stringReader.init(stringValue);
> -              reader = perThread.stringReader;
> +              parent.stringReader.init(stringValue);
> +              reader = parent.stringReader;
>               }
>
>               // Tokenize field and add to postingTable
> @@ -173,8 +174,9 @@ final class DocInverterPerField extends
>                   consumer.add();
>                   success = true;
>                 } finally {
> -                if (!success)
> +                if (!success) {
>                     docState.docWriter.setAborting();
> +                }
>                 }
>                 fieldState.position++;
>                 if (++fieldState.length>= maxFieldLength) {
> @@ -208,4 +210,9 @@ final class DocInverterPerField extends
>       consumer.finish();
>       endConsumer.finish();
>     }
> +
> +  @Override
> +  FieldInfo getFieldInfo() {
> +    return this.fieldInfo;
> +  }
>   }
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,459 @@
> +package org.apache.lucene.index;
> +
> +import java.io.IOException;
> +import java.io.PrintStream;
> +import java.util.ArrayList;
> +import java.util.List;
> +
> +import org.apache.lucene.analysis.Analyzer;
> +import org.apache.lucene.document.Document;
> +import org.apache.lucene.index.codecs.Codec;
> +import org.apache.lucene.search.Similarity;
> +import org.apache.lucene.store.Directory;
> +import org.apache.lucene.store.RAMFile;
> +import org.apache.lucene.util.ArrayUtil;
> +
> +public class DocumentsWriterPerThread {
> +
> +  /**
> +   * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
> +   * which returns the DocConsumer that the DocumentsWriter calls to process the
> +   * documents.
> +   */
> +  abstract static class IndexingChain {
> +    abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread);
> +  }
> +
> +
> +  static final IndexingChain defaultIndexingChain = new IndexingChain() {
> +
> +    @Override
> +    DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) {
> +      /*
> +      This is the current indexing chain:
> +
> +      DocConsumer / DocConsumerPerThread
> +        -->  code: DocFieldProcessor / DocFieldProcessorPerThread
> +          -->  DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
> +            -->  code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
> +              -->  code: DocInverter / DocInverterPerThread / DocInverterPerField
> +                -->  InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
> +                  -->  code: TermsHash / TermsHashPerThread / TermsHashPerField
> +                    -->  TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
> +                      -->  code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
> +                      -->  code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
> +                -->  InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
> +                  -->  code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
> +              -->  code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
> +    */
> +
> +    // Build up indexing chain:
> +
> +      final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriterPerThread);
> +      final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
> +
> +      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriterPerThread, freqProxWriter,
> +                                                           new TermsHash(documentsWriterPerThread, termVectorsWriter, null));
> +      final NormsWriter normsWriter = new NormsWriter();
> +      final DocInverter docInverter = new DocInverter(documentsWriterPerThread.docState, termsHash, normsWriter);
> +      return new DocFieldProcessor(documentsWriterPerThread, docInverter);
> +    }
> +  };
> +
> +  static class DocState {
> +    final DocumentsWriterPerThread docWriter;
> +    Analyzer analyzer;
> +    int maxFieldLength;
> +    PrintStream infoStream;
> +    Similarity similarity;
> +    int docID;
> +    Document doc;
> +    String maxTermPrefix;
> +
> +    DocState(DocumentsWriterPerThread docWriter) {
> +      this.docWriter = docWriter;
> +    }
> +
> +    // Only called by asserts
> +    public boolean testPoint(String name) {
> +      return docWriter.writer.testPoint(name);
> +    }
> +  }
> +
> +  /** Called if we hit an exception at a bad time (when
> +   *  updating the index files) and must discard all
> +   *  currently buffered docs.  This resets our state,
> +   *  discarding any docs added since last flush. */
> +  void abort() throws IOException {
> +    try {
> +      if (infoStream != null) {
> +        message("docWriter: now abort");
> +      }
> +      try {
> +        consumer.abort();
> +      } catch (Throwable t) {
> +      }
> +
> +      docStoreSegment = null;
> +      numDocsInStore = 0;
> +      docStoreOffset = 0;
> +
> +      // Reset all postings data
> +      doAfterFlush();
> +
> +    } finally {
> +      aborting = false;
> +      if (infoStream != null) {
> +        message("docWriter: done abort");
> +      }
> +    }
> +  }
> +
> +
> +  final DocumentsWriterRAMAllocator ramAllocator = new DocumentsWriterRAMAllocator();
> +
> +  final DocumentsWriter parent;
> +  final IndexWriter writer;
> +
> +  final Directory directory;
> +  final DocState docState;
> +  final DocConsumer consumer;
> +  private DocFieldProcessor docFieldProcessor;
> +
> +  String segment;                         // Current segment we are working on
> +  private String docStoreSegment;         // Current doc-store segment we are writing
> +  private int docStoreOffset;                     // Current starting doc-store offset of current segment
> +  boolean aborting;               // True if an abort is pending
> +
> +  private final PrintStream infoStream;
> +  private int numDocsInRAM;
> +  private int numDocsInStore;
> +  private int flushedDocCount;
> +  SegmentWriteState flushState;
> +
> +  long[] sequenceIDs = new long[8];
> +
> +  final List<String>  closedFiles = new ArrayList<String>();
> +
> +  long numBytesUsed;
> +
> +  public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
> +    this.directory = directory;
> +    this.parent = parent;
> +    this.writer = parent.indexWriter;
> +    this.infoStream = parent.indexWriter.getInfoStream();
> +    this.docState = new DocState(this);
> +    this.docState.similarity = parent.config.getSimilarity();
> +    this.docState.maxFieldLength = parent.config.getMaxFieldLength();
> +
> +    consumer = indexingChain.getChain(this);
> +    if (consumer instanceof DocFieldProcessor) {
> +      docFieldProcessor = (DocFieldProcessor) consumer;
> +    }
> +
> +  }
> +
> +  void setAborting() {
> +    aborting = true;
> +  }
> +
> +  public void addDocument(Document doc, Analyzer analyzer) throws IOException {
> +    docState.doc = doc;
> +    docState.analyzer = analyzer;
> +    docState.docID = numDocsInRAM;
> +    initSegmentName(false);
> +
> +    final DocWriter perDoc;
> +
> +    boolean success = false;
> +    try {
> +      perDoc = consumer.processDocument();
> +
> +      success = true;
> +    } finally {
> +      if (!success) {
> +        if (!aborting) {
> +          // mark document as deleted
> +          commitDocument(-1);
> +        }
> +      }
> +    }
> +
> +    success = false;
> +    try {
> +      if (perDoc != null) {
> +        perDoc.finish();
> +      }
> +
> +      success = true;
> +    } finally {
> +      if (!success) {
> +        setAborting();
> +      }
> +    }
> +
> +  }
> +
> +  public void commitDocument(long sequenceID) {
> +    if (numDocsInRAM == sequenceIDs.length) {
> +      sequenceIDs = ArrayUtil.grow(sequenceIDs);
> +    }
> +
> +    sequenceIDs[numDocsInRAM] = sequenceID;
> +    numDocsInRAM++;
> +    numDocsInStore++;
> +  }
> +
> +  int getNumDocsInRAM() {
> +    return numDocsInRAM;
> +  }
> +
> +  long getMinSequenceID() {
> +    if (numDocsInRAM == 0) {
> +      return -1;
> +    }
> +    return sequenceIDs[0];
> +  }
> +
> +  /** Returns true if any of the fields in the current
> +  *  buffered docs have omitTermFreqAndPositions==false */
> +  boolean hasProx() {
> +    return (docFieldProcessor != null) ? docFieldProcessor.fieldInfos.hasProx()
> +                                      : true;
> +  }
> +
> +  Codec getCodec() {
> +    return flushState.codec;
> +  }
> +
> +  void initSegmentName(boolean onlyDocStore) {
> +    if (segment == null&&  (!onlyDocStore || docStoreSegment == null)) {
> +      // this call is synchronized on IndexWriter.segmentInfos
> +      segment = writer.newSegmentName();
> +      assert numDocsInRAM == 0;
> +    }
> +    if (docStoreSegment == null) {
> +      docStoreSegment = segment;
> +      assert numDocsInStore == 0;
> +    }
> +  }
> +
> +
> +  private void initFlushState(boolean onlyDocStore) {
> +    initSegmentName(onlyDocStore);
> +    flushState = new SegmentWriteState(infoStream, directory, segment, docFieldProcessor.fieldInfos,
> +                                       docStoreSegment, numDocsInRAM, numDocsInStore, writer.getConfig().getTermIndexInterval(),
> +                                       writer.codecs);
> +  }
> +
> +  /** Reset after a flush */
> +  private void doAfterFlush() throws IOException {
> +    segment = null;
> +    numDocsInRAM = 0;
> +  }
> +
> +  /** Flush all pending docs to a new segment */
> +  SegmentInfo flush(boolean closeDocStore) throws IOException {
> +    assert numDocsInRAM>  0;
> +
> +    initFlushState(closeDocStore);
> +
> +    docStoreOffset = numDocsInStore;
> +
> +    if (infoStream != null) {
> +      message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
> +    }
> +
> +    boolean success = false;
> +
> +    try {
> +
> +      if (closeDocStore) {
> +        assert flushState.docStoreSegmentName != null;
> +        assert flushState.docStoreSegmentName.equals(flushState.segmentName);
> +        closeDocStore();
> +        flushState.numDocsInStore = 0;
> +      }
> +
> +      consumer.flush(flushState);
> +
> +      if (infoStream != null) {
> +        SegmentInfo si = new SegmentInfo(flushState.segmentName,
> +            flushState.numDocs,
> +            directory, false,
> +            docStoreOffset, flushState.docStoreSegmentName,
> +            false,
> +            hasProx(),
> +            getCodec());
> +
> +        final long newSegmentSize = si.sizeInBytes();
> +        String message = "  ramUsed=" + ramAllocator.nf.format(((double) numBytesUsed)/1024./1024.) + " MB" +
> +          " newFlushedSize=" + newSegmentSize +
> +          " docs/MB=" + ramAllocator.nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
> +          " new/old=" + ramAllocator.nf.format(100.0*newSegmentSize/numBytesUsed) + "%";
> +        message(message);
> +      }
> +
> +      flushedDocCount += flushState.numDocs;
> +
> +      long maxSequenceID = sequenceIDs[numDocsInRAM-1];
> +      doAfterFlush();
> +
> +      // Create new SegmentInfo, but do not add to our
> +      // segmentInfos until deletes are flushed
> +      // successfully.
> +      SegmentInfo newSegment = new SegmentInfo(flushState.segmentName,
> +                                   flushState.numDocs,
> +                                   directory, false,
> +                                   docStoreOffset, flushState.docStoreSegmentName,
> +                                   false,
> +                                   hasProx(),
> +                                   getCodec());
> +
> +
> +      newSegment.setMinSequenceID(sequenceIDs[0]);
> +      newSegment.setMaxSequenceID(maxSequenceID);
> +
> +      IndexWriter.setDiagnostics(newSegment, "flush");
> +      success = true;
> +
> +      return newSegment;
> +    } finally {
> +      if (!success) {
> +        setAborting();
> +      }
> +    }
> +  }
> +
> +  /** Closes the current open doc stores an returns the doc
> +   *  store segment name.  This returns null if there are *
> +   *  no buffered documents. */
> +  String closeDocStore() throws IOException {
> +
> +    // nocommit
> +//    if (infoStream != null)
> +//      message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
> +
> +    boolean success = false;
> +
> +    try {
> +      initFlushState(true);
> +      closedFiles.clear();
> +
> +      consumer.closeDocStore(flushState);
> +      // nocommit
> +      //assert 0 == openFiles.size();
> +
> +      String s = docStoreSegment;
> +      docStoreSegment = null;
> +      docStoreOffset = 0;
> +      numDocsInStore = 0;
> +      success = true;
> +      return s;
> +    } finally {
> +      if (!success) {
> +        parent.abort();
> +      }
> +    }
> +  }
> +
> +
> +  /** Get current segment name we are writing. */
> +  String getSegment() {
> +    return segment;
> +  }
> +
> +  /** Returns the current doc store segment we are writing
> +   *  to. */
> +  String getDocStoreSegment() {
> +    return docStoreSegment;
> +  }
> +
> +  /** Returns the doc offset into the shared doc store for
> +   *  the current buffered docs. */
> +  int getDocStoreOffset() {
> +    return docStoreOffset;
> +  }
> +
> +
> +  @SuppressWarnings("unchecked")
> +  List<String>  closedFiles() {
> +    return (List<String>) ((ArrayList<String>) closedFiles).clone();
> +  }
> +
> +  void addOpenFile(String name) {
> +    synchronized(parent.openFiles) {
> +      assert !parent.openFiles.contains(name);
> +      parent.openFiles.add(name);
> +    }
> +  }
> +
> +  void removeOpenFile(String name) {
> +    synchronized(parent.openFiles) {
> +      assert parent.openFiles.contains(name);
> +      parent.openFiles.remove(name);
> +    }
> +    closedFiles.add(name);
> +  }
> +
> +  /** Consumer returns this on each doc.  This holds any
> +   *  state that must be flushed synchronized "in docID
> +   *  order".  We gather these and flush them in order. */
> +  abstract static class DocWriter {
> +    DocWriter next;
> +    int docID;
> +    abstract void finish() throws IOException;
> +    abstract void abort();
> +    abstract long sizeInBytes();
> +
> +    void setNext(DocWriter next) {
> +      this.next = next;
> +    }
> +  }
> +
> +  /**
> +   * Create and return a new DocWriterBuffer.
> +   */
> +  PerDocBuffer newPerDocBuffer() {
> +    return new PerDocBuffer();
> +  }
> +
> +  /**
> +   * RAMFile buffer for DocWriters.
> +   */
> +  class PerDocBuffer extends RAMFile {
> +
> +    /**
> +     * Allocate bytes used from shared pool.
> +     */
> +    protected byte[] newBuffer(int size) {
> +      assert size == DocumentsWriterRAMAllocator.PER_DOC_BLOCK_SIZE;
> +      return ramAllocator.perDocAllocator.getByteBlock();
> +    }
> +
> +    /**
> +     * Recycle the bytes used.
> +     */
> +    synchronized void recycle() {
> +      if (buffers.size()>  0) {
> +        setLength(0);
> +
> +        // Recycle the blocks
> +        ramAllocator.perDocAllocator.recycleByteBlocks(buffers);
> +        buffers.clear();
> +        sizeInBytes = 0;
> +
> +        assert numBuffers() == 0;
> +      }
> +    }
> +  }
> +
> +  void bytesUsed(long numBytes) {
> +    ramAllocator.bytesUsed(numBytes);
> +  }
> +
> +  void message(String message) {
> +    if (infoStream != null)
> +      writer.message("DW: " + message);
> +  }
> +}
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,148 @@
> +package org.apache.lucene.index;
> +
> +import java.text.NumberFormat;
> +import java.util.ArrayList;
> +import java.util.List;
> +
> +import org.apache.lucene.util.Constants;
> +
> +class DocumentsWriterRAMAllocator {
> +  final ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
> +  final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
> +
> +
> +  class ByteBlockAllocator extends ByteBlockPool.Allocator {
> +    final int blockSize;
> +
> +    ByteBlockAllocator(int blockSize) {
> +      this.blockSize = blockSize;
> +    }
> +
> +    ArrayList<byte[]>  freeByteBlocks = new ArrayList<byte[]>();
> +
> +    /* Allocate another byte[] from the shared pool */
> +    @Override
> +    byte[] getByteBlock() {
> +      final int size = freeByteBlocks.size();
> +      final byte[] b;
> +      if (0 == size) {
> +        b = new byte[blockSize];
> +        // Always record a block allocated, even if
> +        // trackAllocations is false.  This is necessary
> +        // because this block will be shared between
> +        // things that don't track allocations (term
> +        // vectors) and things that do (freq/prox
> +        // postings).
> +        numBytesUsed += blockSize;
> +      } else
> +        b = freeByteBlocks.remove(size-1);
> +      return b;
> +    }
> +
> +    /* Return byte[]'s to the pool */
> +    @Override
> +    void recycleByteBlocks(byte[][] blocks, int start, int end) {
> +      for(int i=start;i<end;i++) {
> +        freeByteBlocks.add(blocks[i]);
> +      }
> +    }
> +
> +    @Override
> +    void recycleByteBlocks(List<byte[]>  blocks) {
> +      final int size = blocks.size();
> +      for(int i=0;i<size;i++) {
> +        freeByteBlocks.add(blocks.get(i));
> +      }
> +    }
> +  }
> +
> +  private ArrayList<int[]>  freeIntBlocks = new ArrayList<int[]>();
> +
> +  /* Allocate another int[] from the shared pool */
> +  int[] getIntBlock() {
> +    final int size = freeIntBlocks.size();
> +    final int[] b;
> +    if (0 == size) {
> +      b = new int[INT_BLOCK_SIZE];
> +      // Always record a block allocated, even if
> +      // trackAllocations is false.  This is necessary
> +      // because this block will be shared between
> +      // things that don't track allocations (term
> +      // vectors) and things that do (freq/prox
> +      // postings).
> +      numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE;
> +    } else
> +      b = freeIntBlocks.remove(size-1);
> +    return b;
> +  }
> +
> +  void bytesUsed(long numBytes) {
> +    numBytesUsed += numBytes;
> +  }
> +
> +  /* Return int[]s to the pool */
> +  void recycleIntBlocks(int[][] blocks, int start, int end) {
> +    for(int i=start;i<end;i++)
> +      freeIntBlocks.add(blocks[i]);
> +  }
> +
> +  long getRAMUsed() {
> +    return numBytesUsed;
> +  }
> +
> +  long numBytesUsed;
> +
> +  NumberFormat nf = NumberFormat.getInstance();
> +
> +  final static int PER_DOC_BLOCK_SIZE = 1024;
> +
> +  // Coarse estimates used to measure RAM usage of buffered deletes
> +  final static int OBJECT_HEADER_BYTES = 8;
> +  final static int POINTER_NUM_BYTE = Constants.JRE_IS_64BIT ? 8 : 4;
> +  final static int INT_NUM_BYTE = 4;
> +  final static int CHAR_NUM_BYTE = 2;
> +
> +  /* Rough logic: HashMap has an array[Entry] w/ varying
> +     load factor (say 2 * POINTER).  Entry is object w/ Term
> +     key, BufferedDeletes.Num val, int hash, Entry next
> +     (OBJ_HEADER + 3*POINTER + INT).  Term is object w/
> +     String field and String text (OBJ_HEADER + 2*POINTER).
> +     We don't count Term's field since it's interned.
> +     Term's text is String (OBJ_HEADER + 4*INT + POINTER +
> +     OBJ_HEADER + string.length*CHAR).  BufferedDeletes.num is
> +     OBJ_HEADER + INT. */
> +
> +  final static int BYTES_PER_DEL_TERM = 8*POINTER_NUM_BYTE + 5*OBJECT_HEADER_BYTES + 6*INT_NUM_BYTE;
> +
> +  /* Rough logic: del docIDs are List<Integer>.  Say list
> +     allocates ~2X size (2*POINTER).  Integer is OBJ_HEADER
> +     + int */
> +  final static int BYTES_PER_DEL_DOCID = 2*POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE;
> +
> +  /* Rough logic: HashMap has an array[Entry] w/ varying
> +     load factor (say 2 * POINTER).  Entry is object w/
> +     Query key, Integer val, int hash, Entry next
> +     (OBJ_HEADER + 3*POINTER + INT).  Query we often
> +     undercount (say 24 bytes).  Integer is OBJ_HEADER + INT. */
> +  final static int BYTES_PER_DEL_QUERY = 5*POINTER_NUM_BYTE + 2*OBJECT_HEADER_BYTES + 2*INT_NUM_BYTE + 24;
> +
> +  /* Initial chunks size of the shared byte[] blocks used to
> +     store postings data */
> +  final static int BYTE_BLOCK_SHIFT = 15;
> +  final static int BYTE_BLOCK_SIZE = 1<<  BYTE_BLOCK_SHIFT;
> +  final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
> +  final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
> +
> +  final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
> +
> +  /* Initial chunks size of the shared int[] blocks used to
> +     store postings data */
> +  final static int INT_BLOCK_SHIFT = 13;
> +  final static int INT_BLOCK_SIZE = 1<<  INT_BLOCK_SHIFT;
> +  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
> +
> +  String toMB(long v) {
> +    return nf.format(v/1024./1024.);
> +  }
> +
> +}
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,255 @@
> +package org.apache.lucene.index;
> +
> +import java.io.IOException;
> +import java.util.Iterator;
> +import java.util.concurrent.locks.Condition;
> +import java.util.concurrent.locks.Lock;
> +import java.util.concurrent.locks.ReentrantLock;
> +
> +import org.apache.lucene.document.Document;
> +import org.apache.lucene.util.ThreadInterruptedException;
> +
> +abstract class DocumentsWriterThreadPool {
> +  public static abstract class Task<T>  {
> +    private boolean clearThreadBindings = false;
> +
> +    protected void clearThreadBindings() {
> +      this.clearThreadBindings = true;
> +    }
> +
> +    boolean doClearThreadBindings() {
> +      return clearThreadBindings;
> +    }
> +  }
> +
> +  public static abstract class PerThreadTask<T>  extends Task<T>  {
> +    abstract T process(final DocumentsWriterPerThread perThread) throws IOException;
> +  }
> +
> +  public static abstract class AllThreadsTask<T>  extends Task<T>  {
> +    abstract T process(final Iterator<DocumentsWriterPerThread>  threadsIterator) throws IOException;
> +  }
> +
> +  protected abstract static class ThreadState {
> +    private DocumentsWriterPerThread perThread;
> +    private boolean isIdle = true;
> +
> +    void start() {/* extension hook */}
> +    void finish() {/* extension hook */}
> +  }
> +
> +  private int pauseThreads = 0;
> +
> +  protected final int maxNumThreadStates;
> +  protected ThreadState[] allThreadStates = new ThreadState[0];
> +
> +  private final Lock lock = new ReentrantLock();
> +  private final Condition threadStateAvailable = lock.newCondition();
> +  private boolean globalLock;
> +  private boolean aborting;
> +
> +  DocumentsWriterThreadPool(int maxNumThreadStates) {
> +    this.maxNumThreadStates = (maxNumThreadStates<  1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumThreadStates;
> +  }
> +
> +  public final int getMaxThreadStates() {
> +    return this.maxNumThreadStates;
> +  }
> +
> +  void pauseAllThreads() {
> +    lock.lock();
> +    try {
> +      pauseThreads++;
> +      while(!allThreadsIdle()) {
> +        try {
> +          threadStateAvailable.await();
> +        } catch (InterruptedException ie) {
> +          throw new ThreadInterruptedException(ie);
> +        }
> +      }
> +    } finally {
> +      lock.unlock();
> +    }
> +  }
> +
> +  void resumeAllThreads() {
> +    lock.lock();
> +    try {
> +      pauseThreads--;
> +      assert pauseThreads>= 0;
> +      if (0 == pauseThreads) {
> +        threadStateAvailable.signalAll();
> +      }
> +    } finally {
> +      lock.unlock();
> +    }
> +  }
> +
> +  private boolean allThreadsIdle() {
> +    for (ThreadState state : allThreadStates) {
> +      if (!state.isIdle) {
> +        return false;
> +      }
> +    }
> +
> +    return true;
> +  }
> +
> +  void abort() throws IOException {
> +    pauseAllThreads();
> +    aborting = true;
> +    for (ThreadState state : allThreadStates) {
> +      state.perThread.abort();
> +    }
> +  }
> +
> +  void finishAbort() {
> +    aborting = false;
> +    resumeAllThreads();
> +  }
> +
> +  public<T>  T executeAllThreads(AllThreadsTask<T>  task) throws IOException {
> +    T result = null;
> +
> +    lock.lock();
> +    try {
> +      try {
> +        while (globalLock) {
> +          threadStateAvailable.await();
> +        }
> +      } catch (InterruptedException ie) {
> +        throw new ThreadInterruptedException(ie);
> +      }
> +
> +      globalLock = true;
> +      pauseAllThreads();
> +    } finally {
> +      lock.unlock();
> +    }
> +
> +
> +    // all threads are idle now
> +
> +    try {
> +      final ThreadState[] localAllThreads = allThreadStates;
> +
> +      result = task.process(new Iterator<DocumentsWriterPerThread>() {
> +        int i = 0;
> +
> +        @Override
> +        public boolean hasNext() {
> +          return i<  localAllThreads.length;
> +        }
> +
> +        @Override
> +        public DocumentsWriterPerThread next() {
> +          return localAllThreads[i++].perThread;
> +        }
> +
> +        @Override
> +        public void remove() {
> +          throw new UnsupportedOperationException("remove() not supported.");
> +        }
> +      });
> +      return result;
> +    } finally {
> +      lock.lock();
> +      try {
> +        try {
> +          if (task.doClearThreadBindings()) {
> +            clearAllThreadBindings();
> +          }
> +        } finally {
> +          globalLock = false;
> +          resumeAllThreads();
> +          threadStateAvailable.signalAll();
> +        }
> +      } finally {
> +        lock.unlock();
> +      }
> +
> +    }
> +  }
> +
> +
> +  public final<T>  T executePerThread(DocumentsWriter documentsWriter, Document doc, PerThreadTask<T>  task) throws IOException {
> +    ThreadState state = acquireThreadState(documentsWriter, doc);
> +    boolean success = false;
> +    try {
> +      T result = task.process(state.perThread);
> +      success = true;
> +      return result;
> +    } finally {
> +      boolean abort = false;
> +      if (!success&&  state.perThread.aborting) {
> +        state.perThread.aborting = false;
> +        abort = true;
> +      }
> +
> +      returnDocumentsWriterPerThread(state, task.doClearThreadBindings());
> +
> +      if (abort) {
> +        documentsWriter.abort();
> +      }
> +    }
> +  }
> +
> +  protected final<T extends ThreadState>  T addNewThreadState(DocumentsWriter documentsWriter, T threadState) {
> +    // Just create a new "private" thread state
> +    ThreadState[] newArray = new ThreadState[1+allThreadStates.length];
> +    if (allThreadStates.length>  0)
> +      System.arraycopy(allThreadStates, 0, newArray, 0, allThreadStates.length);
> +    threadState.perThread = documentsWriter.newDocumentsWriterPerThread();
> +    newArray[allThreadStates.length] = threadState;
> +
> +    allThreadStates = newArray;
> +    return threadState;
> +  }
> +
> +  protected abstract ThreadState selectThreadState(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
> +  protected void clearThreadBindings(ThreadState flushedThread) {
> +    // subclasses can optionally override this to cleanup after a thread flushed
> +  }
> +
> +  protected void clearAllThreadBindings() {
> +    // subclasses can optionally override this to cleanup after a thread flushed
> +  }
> +
> +
> +  private final ThreadState acquireThreadState(DocumentsWriter documentsWriter, Document doc) {
> +    lock.lock();
> +    try {
> +      ThreadState threadState = selectThreadState(Thread.currentThread(), documentsWriter, doc);
> +
> +      try {
> +        while (!threadState.isIdle || globalLock || aborting) {
> +          threadStateAvailable.await();
> +        }
> +      } catch (InterruptedException ie) {
> +        throw new ThreadInterruptedException(ie);
> +      }
> +
> +      threadState.isIdle = false;
> +      threadState.start();
> +
> +      return threadState;
> +
> +    } finally {
> +      lock.unlock();
> +    }
> +  }
> +
> +  private final void returnDocumentsWriterPerThread(ThreadState state, boolean clearThreadBindings) {
> +    lock.lock();
> +    try {
> +      state.finish();
> +      if (clearThreadBindings) {
> +        clearThreadBindings(state);
> +      }
> +      state.isIdle = true;
> +      threadStateAvailable.signalAll();
> +    } finally {
> +      lock.unlock();
> +    }
> +  }
> +}
>
>
>


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]