design change suggestion

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

design change suggestion

Stanislav Jordanov
Hi guys,

For the purpose of our product we've devised a bunch of small tool
classes which handle various utility tasks like:
1. IndexRecoverer - assuming the "segments" file is missing or
corrupted, this tool rebuilds it based on the *.cfs (and other) files
found in the index dir (excludes files listed in deletable)

2. IndexSplitter - splits an existing index in 2, 3 or more relatively
equally sized indices. It simply splits the segments files in distinct
directories and the uses the IndexRecoverer to rebuild each new Index's
segment file

3. IndexMerger - in reverse to IndexSplitter merges some indices into
single index; Uses a modified version of  IndexWriter.addIndexes - it
does not optimize() in the beginning and in the end. This way the
resulting index is not a single huge cfs file, which is desirable in
some cases.

4. IndexOptimizer - Optimizes existing index by merging the 'small'
segments and compacting the large segments (compacting means 'removing
the deleted docs within them'); Also converts to compound file format
any old-style "spilled" segments.

All of the above mentioned tools are classes within the
org.apache.lucene.index package as they use some package-scope methods
and properties (+ they feel like belonging there).

Now the design change suggestion - it is about the 'deletable' related code;
according to the source comments  - the delayed deletion of files
through the 'deletable' is required on Window only as this OS prevents
files opened for reading to be deleted.
Working on the IndexOptimizer tool I found myself in a situation where I
needed to 'safe delete' a bunch of obsolete segments while having only
an (FS)Directory and a segment file name. And the 'safe delete' feature
is in IndexWriter. Then after reviewing the code I came to the
conclusion that the 'safe delete' feature logically belongs to the
(FS)Directory class, not to IndexWriter. I was able to move the
corresponding code from IndexWriter to (FS)Directory IMO this way is better.
I am attaching (the 2.0.0) modified sources of IndexWriter and
(FS)Directory for your consideration. (Disclaimer - I can't guarantee my
changes are bug-free)

Best regards,
Stanislav

package org.apache.lucene.index;

/**
 * Copyright 2004 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.search.Similarity;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.RAMDirectory;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Vector;


/**
  An IndexWriter creates and maintains an index.

  The third argument to the
  <a href="#IndexWriter(org.apache.lucene.store.Directory, org.apache.lucene.analysis.Analyzer, boolean)"><b>constructor</b></a>
  determines whether a new index is created, or whether an existing index is
  opened for the addition of new documents.

  In either case, documents are added with the <a
  href="#addDocument(org.apache.lucene.document.Document)"><b>addDocument</b></a> method.  
  When finished adding documents, <a href="#close()"><b>close</b></a> should be called.

  <p>If an index will not have more documents added for a while and optimal search
  performance is desired, then the <a href="#optimize()"><b>optimize</b></a>
  method should be called before the index is closed.
 
  <p>Opening an IndexWriter creates a lock file for the directory in use. Trying to open
  another IndexWriter on the same directory will lead to an IOException. The IOException
  is also thrown if an IndexReader on the same directory is used to delete documents
  from the index.
 
  @see IndexModifier IndexModifier supports the important methods of IndexWriter plus deletion
  */

public class IndexWriter {

  /**
   * Default value for the write lock timeout (1,000).
   */
  public final static long WRITE_LOCK_TIMEOUT = 1000;

  private long writeLockTimeout = WRITE_LOCK_TIMEOUT;

  /**
   * Default value for the commit lock timeout (10,000).
   */
  public final static long COMMIT_LOCK_TIMEOUT = 10000;

  private long commitLockTimeout = COMMIT_LOCK_TIMEOUT;

  static final String WRITE_LOCK_NAME = "write.lock";
  static final String COMMIT_LOCK_NAME = "commit.lock";

  /**
   * Default value is 10. Change using {@link #setMergeFactor(int)}.
   */
  public final static int DEFAULT_MERGE_FACTOR = 7; //10

  /**
   * Default value is 10. Change using {@link #setMaxBufferedDocs(int)}.
   */
  public final static int DEFAULT_MAX_BUFFERED_DOCS = 20; //10

  /**
   * @deprecated use {@link #DEFAULT_MAX_BUFFERED_DOCS} instead
   */
  //public final static int DEFAULT_MIN_MERGE_DOCS = DEFAULT_MAX_BUFFERED_DOCS;

  /**
   * Default value is {@link Integer#MAX_VALUE}. Change using {@link #setMaxMergeDocs(int)}.
   */
  public final static int DEFAULT_MAX_MERGE_DOCS = 20 * 7 * 7 * 7 * 7; //Integer.MAX_VALUE

  /**
   * Default value is 10,000. Change using {@link #setMaxFieldLength(int)}.
   */
  public final static int DEFAULT_MAX_FIELD_LENGTH = 10000000;

  /**
   * Default value is 128. Change using {@link #setTermIndexInterval(int)}.
   */
  public final static int DEFAULT_TERM_INDEX_INTERVAL = 128;
 
  private Directory directory;  // where this index resides
  private Analyzer analyzer;    // how to analyze text

  private Similarity similarity = Similarity.getDefault(); // how to normalize

  private SegmentInfos segmentInfos = new SegmentInfos(); // the segments
  private final Directory ramDirectory = new RAMDirectory(); // for temp segs

  private Lock writeLock;

  private int termIndexInterval = DEFAULT_TERM_INDEX_INTERVAL;

  /** Use compound file setting. Defaults to true, minimizing the number of
   * files used.  Setting this to false may improve indexing performance, but
   * may also cause file handle problems.
   */
  private boolean useCompoundFile = true;
 
  private boolean closeDir;

  /** Get the current setting of whether to use the compound file format.
   *  Note that this just returns the value you set with setUseCompoundFile(boolean)
   *  or the default. You cannot use this to query the status of an existing index.
   *  @see #setUseCompoundFile(boolean)
   */
  public boolean getUseCompoundFile() {
    return useCompoundFile;
  }

  /** Setting to turn on usage of a compound file. When on, multiple files
   *  for each segment are merged into a single file once the segment creation
   *  is finished. This is done regardless of what directory is in use.
   */
  public void setUseCompoundFile(boolean value) {
    useCompoundFile = value;
  }

  /** Expert: Set the Similarity implementation used by this IndexWriter.
   *
   * @see Similarity#setDefault(Similarity)
   */
  public void setSimilarity(Similarity similarity) {
    this.similarity = similarity;
  }

  /** Expert: Return the Similarity implementation used by this IndexWriter.
   *
   * <p>This defaults to the current value of {@link Similarity#getDefault()}.
   */
  public Similarity getSimilarity() {
    return this.similarity;
  }

  /** Expert: Set the interval between indexed terms.  Large values cause less
   * memory to be used by IndexReader, but slow random-access to terms.  Small
   * values cause more memory to be used by an IndexReader, and speed
   * random-access to terms.
   *
   * This parameter determines the amount of computation required per query
   * term, regardless of the number of documents that contain that term.  In
   * particular, it is the maximum number of other terms that must be
   * scanned before a term is located and its frequency and position information
   * may be processed.  In a large index with user-entered query terms, query
   * processing time is likely to be dominated not by term lookup but rather
   * by the processing of frequency and positional data.  In a small index
   * or when many uncommon query terms are generated (e.g., by wildcard
   * queries) term lookup may become a dominant cost.
   *
   * In particular, <code>numUniqueTerms/interval</code> terms are read into
   * memory by an IndexReader, and, on average, <code>interval/2</code> terms
   * must be scanned for each random term access.
   *
   * @see #DEFAULT_TERM_INDEX_INTERVAL
   */
  public void setTermIndexInterval(int interval) {
    this.termIndexInterval = interval;
  }

  /** Expert: Return the interval between indexed terms.
   *
   * @see #setTermIndexInterval(int)
   */
  public int getTermIndexInterval() { return termIndexInterval; }

  /**
   * Constructs an IndexWriter for the index in <code>path</code>.
   * Text will be analyzed with <code>a</code>.  If <code>create</code>
   * is true, then a new, empty index will be created in
   * <code>path</code>, replacing the index already there, if any.
   *
   * @param path the path to the index directory
   * @param a the analyzer to use
   * @param create <code>true</code> to create the index or overwrite
   *  the existing one; <code>false</code> to append to the existing
   *  index
   * @throws IOException if the directory cannot be read/written to, or
   *  if it does not exist, and <code>create</code> is
   *  <code>false</code>
   */
  public IndexWriter(String path, Analyzer a, boolean create)
       throws IOException {
    this(FSDirectory.getDirectory(path, create), a, create, true);
  }

  /**
   * Constructs an IndexWriter for the index in <code>path</code>.
   * Text will be analyzed with <code>a</code>.  If <code>create</code>
   * is true, then a new, empty index will be created in
   * <code>path</code>, replacing the index already there, if any.
   *
   * @param path the path to the index directory
   * @param a the analyzer to use
   * @param create <code>true</code> to create the index or overwrite
   *  the existing one; <code>false</code> to append to the existing
   *  index
   * @throws IOException if the directory cannot be read/written to, or
   *  if it does not exist, and <code>create</code> is
   *  <code>false</code>
   */
  public IndexWriter(File path, Analyzer a, boolean create)
       throws IOException {
    this(FSDirectory.getDirectory(path, create), a, create, true);
  }

  /**
   * Constructs an IndexWriter for the index in <code>d</code>.
   * Text will be analyzed with <code>a</code>.  If <code>create</code>
   * is true, then a new, empty index will be created in
   * <code>d</code>, replacing the index already there, if any.
   *
   * @param d the index directory
   * @param a the analyzer to use
   * @param create <code>true</code> to create the index or overwrite
   *  the existing one; <code>false</code> to append to the existing
   *  index
   * @throws IOException if the directory cannot be read/written to, or
   *  if it does not exist, and <code>create</code> is
   *  <code>false</code>
   */
  public IndexWriter(Directory d, Analyzer a, boolean create)
       throws IOException {
    this(d, a, create, false);
  }
 
  public IndexWriter(Directory d, Analyzer a, final boolean create, boolean closeDir)
    throws IOException {
      this.closeDir = closeDir;
      directory = d;
      analyzer = a;

      Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME);
      if (!writeLock.obtain(writeLockTimeout)) // obtain write lock
        throw new IOException("Index locked for write: " + writeLock);
      this.writeLock = writeLock;                   // save it

      synchronized (directory) {        // in- & inter-process sync
        new Lock.With(directory.makeLock(IndexWriter.COMMIT_LOCK_NAME), commitLockTimeout) {
            public Object doBody() throws IOException {
              if (create)
                segmentInfos.write(directory);
              else
                segmentInfos.read(directory);
              return null;
            }
          }.run();
      }
  }

  /** Determines the largest number of documents ever merged by addDocument().
   * Small values (e.g., less than 10,000) are best for interactive indexing,
   * as this limits the length of pauses while indexing to a few seconds.
   * Larger values are best for batched indexing and speedier searches.
   *
   * <p>The default value is {@link Integer#MAX_VALUE}.
   */
  public void setMaxMergeDocs(int maxMergeDocs) {
    this.maxMergeDocs = maxMergeDocs;
  }

  /**
   * @see #setMaxMergeDocs
   */
  public int getMaxMergeDocs() {
    return maxMergeDocs;
  }

  /**
   * The maximum number of terms that will be indexed for a single field in a
   * document.  This limits the amount of memory required for indexing, so that
   * collections with very large files will not crash the indexing process by
   * running out of memory.<p/>
   * Note that this effectively truncates large documents, excluding from the
   * index terms that occur further in the document.  If you know your source
   * documents are large, be sure to set this value high enough to accomodate
   * the expected size.  If you set it to Integer.MAX_VALUE, then the only limit
   * is your memory, but you should anticipate an OutOfMemoryError.<p/>
   * By default, no more than 10,000 terms will be indexed for a field.
   */
  public void setMaxFieldLength(int maxFieldLength) {
    this.maxFieldLength = maxFieldLength;
  }
 
  /**
   * @see #setMaxFieldLength
   */
  public int getMaxFieldLength() {
    return maxFieldLength;
  }

  /** Determines the minimal number of documents required before the buffered
   * in-memory documents are merging and a new Segment is created.
   * Since Documents are merged in a {@link org.apache.lucene.store.RAMDirectory},
   * large value gives faster indexing.  At the same time, mergeFactor limits
   * the number of files open in a FSDirectory.
   *
   * <p> The default value is 10.
   *
   * @throws IllegalArgumentException if maxBufferedDocs is smaller than 2
   */
  public void setMaxBufferedDocs(int maxBufferedDocs) {
    if (maxBufferedDocs < 2)
      throw new IllegalArgumentException("maxBufferedDocs must at least be 2");
    this.minMergeDocs = maxBufferedDocs;
  }

  /**
   * @see #setMaxBufferedDocs
   */
  public int getMaxBufferedDocs() {
    return minMergeDocs;
  }

  /** Determines how often segment indices are merged by addDocument().  With
   * smaller values, less RAM is used while indexing, and searches on
   * unoptimized indices are faster, but indexing speed is slower.  With larger
   * values, more RAM is used during indexing, and while searches on unoptimized
   * indices are slower, indexing is faster.  Thus larger values (> 10) are best
   * for batch index creation, and smaller values (< 10) for indices that are
   * interactively maintained.
   *
   * <p>This must never be less than 2.  The default value is 10.
   */
  public void setMergeFactor(int mergeFactor) {
    if (mergeFactor < 2)
      throw new IllegalArgumentException("mergeFactor cannot be less than 2");
    this.mergeFactor = mergeFactor;
  }

  /**
   * @see #setMergeFactor
   */
  public int getMergeFactor() {
    return mergeFactor;
  }

  /** If non-null, information about merges and a message when
   * maxFieldLength is reached will be printed to this.
   */
  public void setInfoStream(PrintStream infoStream) {
    this.infoStream = infoStream;
  }

  /**
   * @see #setInfoStream
   */
  public PrintStream getInfoStream() {
    return infoStream;
  }

  /**
   * Sets the maximum time to wait for a commit lock (in milliseconds).
   */
  public void setCommitLockTimeout(long commitLockTimeout) {
    this.commitLockTimeout = commitLockTimeout;
  }

  /**
   * @see #setCommitLockTimeout
   */
  public long getCommitLockTimeout() {
    return commitLockTimeout;
  }

  /**
   * Sets the maximum time to wait for a write lock (in milliseconds).
   */
  public void setWriteLockTimeout(long writeLockTimeout) {
    this.writeLockTimeout = writeLockTimeout;
  }

  /**
   * @see #setWriteLockTimeout
   */
  public long getWriteLockTimeout() {
    return writeLockTimeout;
  }

  /** Flushes all changes to an index and closes all associated files. */
  public synchronized void close() throws IOException {
    flushRamSegments();
    ramDirectory.close();
    if (writeLock != null) {
      writeLock.release();                          // release write lock
      writeLock = null;
    }
    if(closeDir)
      directory.close();
  }

  /** Release the write lock, if needed. */
  protected void finalize() throws IOException {
    if (writeLock != null) {
      writeLock.release();                        // release write lock
      writeLock = null;
    }
  }

  /** Returns the Directory used by this index. */
  public Directory getDirectory() {
      return directory;
  }

  /** Returns the analyzer used by this index. */
  public Analyzer getAnalyzer() {
      return analyzer;
  }


  /** Returns the number of documents currently in this index. */
  public synchronized int docCount() {
    int count = 0;
    for (int i = 0; i < segmentInfos.size(); i++) {
      SegmentInfo si = segmentInfos.info(i);
      count += si.docCount;
    }
    return count;
  }

  /**
   * The maximum number of terms that will be indexed for a single field in a
   * document.  This limits the amount of memory required for indexing, so that
   * collections with very large files will not crash the indexing process by
   * running out of memory.<p/>
   * Note that this effectively truncates large documents, excluding from the
   * index terms that occur further in the document.  If you know your source
   * documents are large, be sure to set this value high enough to accomodate
   * the expected size.  If you set it to Integer.MAX_VALUE, then the only limit
   * is your memory, but you should anticipate an OutOfMemoryError.<p/>
   * By default, no more than 10,000 terms will be indexed for a field.
   *
   */
  private int maxFieldLength = DEFAULT_MAX_FIELD_LENGTH;

  /**
   * Adds a document to this index.  If the document contains more than
   * {@link #setMaxFieldLength(int)} terms for a given field, the remainder are
   * discarded.
   */
  public void addDocument(Document doc) throws IOException {
    addDocument(doc, analyzer);
  }

  /**
   * Adds a document to this index, using the provided analyzer instead of the
   * value of {@link #getAnalyzer()}.  If the document contains more than
   * {@link #setMaxFieldLength(int)} terms for a given field, the remainder are
   * discarded.
   */
  public void addDocument(Document doc, Analyzer analyzer) throws IOException {
    DocumentWriter dw =
      new DocumentWriter(ramDirectory, analyzer, this);
    dw.setInfoStream(infoStream);
    String segmentName = newSegmentName();
    dw.addDocument(segmentName, doc);
    synchronized (this) {
      segmentInfos.addElement(new SegmentInfo(segmentName, 1, ramDirectory));
      maybeMergeSegments();
    }
  }

  final int getSegmentsCounter(){
    return segmentInfos.counter;
  }
 
  private final synchronized String newSegmentName() {
    return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX);
  }

  /** Determines how often segment indices are merged by addDocument().  With
   * smaller values, less RAM is used while indexing, and searches on
   * unoptimized indices are faster, but indexing speed is slower.  With larger
   * values, more RAM is used during indexing, and while searches on unoptimized
   * indices are slower, indexing is faster.  Thus larger values (> 10) are best
   * for batch index creation, and smaller values (< 10) for indices that are
   * interactively maintained.
   *
   * <p>This must never be less than 2.  The default value is {@link #DEFAULT_MERGE_FACTOR}.

   */
  private int mergeFactor = DEFAULT_MERGE_FACTOR;

  /** Determines the minimal number of documents required before the buffered
   * in-memory documents are merging and a new Segment is created.
   * Since Documents are merged in a {@link org.apache.lucene.store.RAMDirectory},
   * large value gives faster indexing.  At the same time, mergeFactor limits
   * the number of files open in a FSDirectory.
   *
   * <p> The default value is {@link #DEFAULT_MAX_BUFFERED_DOCS}.

   */
  private int minMergeDocs = DEFAULT_MAX_BUFFERED_DOCS;


  /** Determines the largest number of documents ever merged by addDocument().
   * Small values (e.g., less than 10,000) are best for interactive indexing,
   * as this limits the length of pauses while indexing to a few seconds.
   * Larger values are best for batched indexing and speedier searches.
   *
   * <p>The default value is {@link #DEFAULT_MAX_MERGE_DOCS}.

   */
  private int maxMergeDocs = DEFAULT_MAX_MERGE_DOCS;

  /** If non-null, information about merges will be printed to this.

   */
  private PrintStream infoStream = null;

  /** Merges all segments together into a single segment, optimizing an index
      for search. */
  public synchronized void optimize() throws IOException {
    flushRamSegments();
    while (segmentInfos.size() > 1 ||
           (segmentInfos.size() == 1 &&
            (SegmentReader.hasDeletions(segmentInfos.info(0)) ||
             segmentInfos.info(0).dir != directory ||
             (useCompoundFile &&
              (!SegmentReader.usesCompoundFile(segmentInfos.info(0)) ||
                SegmentReader.hasSeparateNorms(segmentInfos.info(0))))))) {
      int minSegment = segmentInfos.size() - mergeFactor;
      mergeSegments(minSegment < 0 ? 0 : minSegment);
    }
  }

  /** Merges all segments from an array of indexes into this index.
   *
   * <p>This may be used to parallelize batch indexing.  A large document
   * collection can be broken into sub-collections.  Each sub-collection can be
   * indexed in parallel, on a different thread, process or machine.  The
   * complete index can then be created by merging sub-collection indexes
   * with this method.
   *
   * <p>After this completes, the index is optimized. */
  public synchronized void stupidAddIndexes(Directory[] dirs)
      throws IOException {
    optimize();  // start with zero or 1 seg

    int start = segmentInfos.size();

    for (int i = 0; i < dirs.length; i++) {
      SegmentInfos sis = new SegmentInfos();  // read infos from dir
      sis.read(dirs[i]);
      for (int j = 0; j < sis.size(); j++) {
        segmentInfos.addElement(sis.info(j));  // add each info
      }
    }
   
    // merge newly added segments in log(n) passes
    while (segmentInfos.size() > start+mergeFactor) {
      for (int base = start; base < segmentInfos.size(); base++) {
        int end = Math.min(segmentInfos.size(), base+mergeFactor);
        if (end-base > 1)
          mergeSegments(base, end);
      }
    }

    optimize();  // final cleanup
  }

    public synchronized void addIndexes(Directory[] dirs)
        throws IOException {
      //int start = segmentInfos.size();

      for (int i = 0; i < dirs.length; i++) {
        SegmentInfos sis = new SegmentInfos();  // read infos from dir
        sis.read(dirs[i]);
        for (int j = 0; j < sis.size(); j++) {
          segmentInfos.addElement(sis.info(j));  // add each info
        }
      }

      for (int i=segmentInfos.size(); 0 <= --i; ) {
        mergeSegments(i, i + 1);
      }
    }

  /** Merges the provided indexes into this index.
   * <p>After this completes, the index is optimized. </p>
   * <p>The provided IndexReaders are not closed.</p>
   */
  public synchronized void addIndexes(IndexReader[] readers)
    throws IOException {

    optimize();  // start with zero or 1 seg

    final String mergedName = newSegmentName();
    SegmentMerger merger = new SegmentMerger(this, mergedName);

    final Vector segmentsToDelete = new Vector();
    IndexReader sReader = null;
    if (segmentInfos.size() == 1){ // add existing index, if any
        sReader = SegmentReader.get(segmentInfos.info(0));
        merger.add(sReader);
        segmentsToDelete.addElement(sReader);   // queue segment for deletion
    }
     
    for (int i = 0; i < readers.length; i++)      // add new indexes
      merger.add(readers[i]);

    int docCount = merger.merge();                // merge 'em

    segmentInfos.setSize(0);                      // pop old infos & add new
    segmentInfos.addElement(new SegmentInfo(mergedName, docCount, directory));
   
    if(sReader != null)
        sReader.close();

    synchronized (directory) {  // in- & inter-process sync
      new Lock.With(directory.makeLock(COMMIT_LOCK_NAME), commitLockTimeout) {
          public Object doBody() throws IOException {
            segmentInfos.write(directory);  // commit changes
            return null;
          }
        }.run();
    }
   
    deleteSegments(segmentsToDelete);  // delete now-unused segments

    if (useCompoundFile) {
      final Vector filesToDelete = merger.createCompoundFile(mergedName + ".tmp");
      synchronized (directory) { // in- & inter-process sync
        new Lock.With(directory.makeLock(COMMIT_LOCK_NAME), commitLockTimeout) {
          public Object doBody() throws IOException {
            // make compound file visible for SegmentReaders
            directory.renameFile(mergedName + ".tmp", mergedName + ".cfs");
            return null;
          }
        }.run();
      }

      // delete now unused files of segment
      directory.deleteFiles(filesToDelete);
    }
  }

  /** Merges all RAM-resident segments. */
  private final void flushRamSegments() throws IOException {
    int minSegment = segmentInfos.size()-1;
    int docCount = 0;
    while (minSegment >= 0 &&
           (segmentInfos.info(minSegment)).dir == ramDirectory) {
      docCount += segmentInfos.info(minSegment).docCount;
      minSegment--;
    }
    if (minSegment < 0 ||  // add one FS segment?
        (docCount + segmentInfos.info(minSegment).docCount) > mergeFactor ||
        !(segmentInfos.info(segmentInfos.size()-1).dir == ramDirectory))
      minSegment++;
    if (minSegment >= segmentInfos.size())
      return;  // none to merge
    mergeSegments(minSegment);
  }

  /** Incremental segment merger.  */
  private final void maybeMergeSegments() throws IOException {
    long targetMergeDocs = minMergeDocs;
    while (targetMergeDocs <= maxMergeDocs) {
      // find segments smaller than current target size
      int minSegment = segmentInfos.size();
      int mergeDocs = 0;
      while (--minSegment >= 0) {
        SegmentInfo si = segmentInfos.info(minSegment);
        if (si.docCount >= targetMergeDocs)
          break;
        mergeDocs += si.docCount;
      }

      if (mergeDocs >= targetMergeDocs)  // found a merge to do
        mergeSegments(minSegment+1);
      else
        break;

      targetMergeDocs *= mergeFactor;  // increase target size
    }
  }

  /** Pops segments off of segmentInfos stack down to minSegment, merges them,
    and pushes the merged index onto the top of the segmentInfos stack. */
  private final void mergeSegments(int minSegment)
      throws IOException {
    mergeSegments(minSegment, segmentInfos.size());
  }

  public interface MergeListener {
    void  mergeStarted();
    void  mergeFinished();
  }

  private MergeListener mergeListener;

  public void  setMergeListener(MergeListener mergeListener) {
    this.mergeListener = mergeListener;
  }

  /** A notification wrapper to the method that does the real job */
  private final void mergeSegments(int minSegment, int end) throws IOException {
    try {
      if (mergeListener != null) {
        mergeListener.mergeStarted();
      }
      _mergeSegments(minSegment, end);
    }
    finally {
      if (mergeListener != null) {
        mergeListener.mergeFinished();
      }
    }
  }

  /** Merges the named range of segments, replacing them in the stack with a
   * single segment. */
  private final void _mergeSegments(int minSegment, int end)
    throws IOException {
    final String mergedName = newSegmentName();
    if (infoStream != null) infoStream.print("merging segments");
    SegmentMerger merger = new SegmentMerger(this, mergedName);

    final Vector segmentsToDelete = new Vector();
    for (int i = minSegment; i < end; i++) {
      SegmentInfo si = segmentInfos.info(i);
      if (infoStream != null)
        infoStream.print(" " + si.name + " (" + si.docCount + " docs)");
      IndexReader reader = SegmentReader.get(si);
      merger.add(reader);
      if ((reader.directory() == this.directory) || // if we own the directory
          (reader.directory() == this.ramDirectory))
        segmentsToDelete.addElement(reader);   // queue segment for deletion
    }

    int mergedDocCount = merger.merge();

    if (infoStream != null) {
      infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)");
    }

    for (int i = end-1; i > minSegment; i--)     // remove old infos & add new
      segmentInfos.remove(i);
    segmentInfos.set(minSegment, new SegmentInfo(mergedName, mergedDocCount,
                                            directory));

    // close readers before we attempt to delete now-obsolete segments
    merger.closeReaders();

    synchronized (directory) {                 // in- & inter-process sync
      new Lock.With(directory.makeLock(COMMIT_LOCK_NAME), commitLockTimeout) {
          public Object doBody() throws IOException {
            segmentInfos.write(directory);     // commit before deleting
            return null;
          }
        }.run();
    }
   
    deleteSegments(segmentsToDelete);  // delete now-unused segments

    if (useCompoundFile) {
      final Vector filesToDelete = merger.createCompoundFile(mergedName + ".tmp");
      synchronized (directory) { // in- & inter-process sync
        new Lock.With(directory.makeLock(COMMIT_LOCK_NAME), commitLockTimeout) {
          public Object doBody() throws IOException {
            // make compound file visible for SegmentReaders
            directory.renameFile(mergedName + ".tmp", mergedName + ".cfs");
            return null;
          }
        }.run();
      }

      // delete now unused files of segment
      directory.deleteFiles(filesToDelete);
    }
  }

  /*
   * Some operating systems (e.g. Windows) don't permit a file to be deleted
   * while it is opened for read (e.g. by another process or thread). So we
   * assume that when a delete fails it is because the file is open in another
   * process, and queue the file for subsequent deletion.
   */

  private final void deleteSegments(Vector segments) throws IOException {
    for (int i = 0; i < segments.size(); i++) {
      SegmentReader reader = (SegmentReader)segments.elementAt(i);
      reader.directory().deleteFiles(reader.files());
    }
  }

}

package org.apache.lucene.store;

/**
 * Copyright 2004 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.util.Vector;

/** A Directory is a flat list of files.  Files may be written once, when they
 * are created.  Once a file is created it may only be opened for read, or
 * deleted.  Random access is permitted both when reading and writing.
 *
 * <p> Java's i/o APIs not used directly, but rather all i/o is
 * through this API.  This permits things such as: <ul>
 * <li> implementation of RAM-based indices;
 * <li> implementation indices stored in a database, via JDBC;
 * <li> implementation of an index as a single file;
 * </ul>
 *
 * @author Doug Cutting
 */
public abstract class Directory {
  /** Returns an array of strings, one for each file in the directory. */
  public abstract String[] list()
       throws IOException;

  /** Returns true iff a file with the given name exists. */
  public abstract boolean fileExists(String name)
       throws IOException;

  /** Returns the time the named file was last modified. */
  public abstract long fileModified(String name)
       throws IOException;

  /** Set the modified time of an existing file to now. */
  public abstract void touchFile(String name)
       throws IOException;

  /** Removes an existing file in the directory. */
  public abstract void deleteFile(String name)
       throws IOException;

  /** Removes the specified files from the directory. */
  public abstract void deleteFiles(Vector files)
       throws IOException;

  /** Renames an existing file in the directory.
    If a file already exists with the new name, then it is replaced.
    This replacement should be atomic. */
  public abstract void renameFile(String from, String to)
       throws IOException;

  /** Returns the length of a file in the directory. */
  public abstract long fileLength(String name)
       throws IOException;


  /** Creates a new, empty file in the directory with the given name.
      Returns a stream writing this file. */
  public abstract IndexOutput createOutput(String name) throws IOException;


  /** Returns a stream reading an existing file. */
  public abstract IndexInput openInput(String name)
    throws IOException;

  /** Construct a {@link Lock}.
   * @param name the name of the lock file
   */
  public abstract Lock makeLock(String name);

  /** Closes the store. */
  public abstract void close()
       throws IOException;
}

package org.apache.lucene.store;

/**
 * Copyright 2004 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Hashtable;
import java.util.Vector;

import org.apache.lucene.index.IndexFileNameFilter;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexWriter;

/**
 * Straightforward implementation of {@link Directory} as a directory of files.
 *
 * @see Directory
 * @author Doug Cutting
 */
public class FSDirectory extends Directory {

  /** This cache of directories ensures that there is a unique Directory
   * instance per path, so that synchronization on the Directory can be used to
   * synchronize access between readers and writers.
   *
   * This should be a WeakHashMap, so that entries can be GC'd, but that would
   * require Java 1.2.  Instead we use refcounts...
   */
  private static final Hashtable DIRECTORIES = new Hashtable();

  private static boolean disableLocks = false;

  /**
   * Set whether Lucene's use of lock files is disabled. By default,
   * lock files are enabled. They should only be disabled if the index
   * is on a read-only medium like a CD-ROM.
   */
  public static void setDisableLocks(boolean doDisableLocks) {
    FSDirectory.disableLocks = doDisableLocks;
  }

  /**
   * Returns whether Lucene's use of lock files is disabled.
   * @return true if locks are disabled, false if locks are enabled.
   */
  public static boolean getDisableLocks() {
    return FSDirectory.disableLocks;
  }

  /**
   * Directory specified by <code>org.apache.lucene.lockDir</code>
   * or <code>java.io.tmpdir</code> system property
   */
  public static final String LOCK_DIR =
    System.getProperty("org.apache.lucene.lockDir",
      System.getProperty("java.io.tmpdir"));

  /** The default class which implements filesystem-based directories. */
  private static Class IMPL;
  static {
    try {
      String name =
        System.getProperty("org.apache.lucene.FSDirectory.class",
                           FSDirectory.class.getName());
      IMPL = Class.forName(name);
    } catch (ClassNotFoundException e) {
      throw new RuntimeException("cannot load FSDirectory class: " + e.toString(), e);
    } catch (SecurityException se) {
      try {
        IMPL = Class.forName(FSDirectory.class.getName());
      } catch (ClassNotFoundException e) {
        throw new RuntimeException("cannot load default FSDirectory class: " + e.toString(), e);
      }
    }
  }

  private static MessageDigest DIGESTER;

  static {
    try {
      DIGESTER = MessageDigest.getInstance("MD5");
    } catch (NoSuchAlgorithmException e) {
        throw new RuntimeException(e.toString(), e);
    }
  }

  /** A buffer optionally used in renameTo method */
  private byte[] buffer = null;

  /** Returns the directory instance for the named location.
   *
   * <p>Directories are cached, so that, for a given canonical path, the same
   * FSDirectory instance will always be returned.  This permits
   * synchronization on directories.
   *
   * @param path the path to the directory.
   * @param create if true, create, or erase any existing contents.
   * @return the FSDirectory for the named file.  */
  public static FSDirectory getDirectory(String path, boolean create)
      throws IOException {
    return getDirectory(new File(path), create);
  }

  /** Returns the directory instance for the named location.
   *
   * <p>Directories are cached, so that, for a given canonical path, the same
   * FSDirectory instance will always be returned.  This permits
   * synchronization on directories.
   *
   * @param file the path to the directory.
   * @param create if true, create, or erase any existing contents.
   * @return the FSDirectory for the named file.  */
  public static FSDirectory getDirectory(File file, boolean create)
    throws IOException {
    file = new File(file.getCanonicalPath());
    FSDirectory dir;
    synchronized (DIRECTORIES) {
      dir = (FSDirectory)DIRECTORIES.get(file);
      if (dir == null) {
        try {
          dir = (FSDirectory)IMPL.newInstance();
        } catch (Exception e) {
          throw new RuntimeException("cannot load FSDirectory class: " + e.toString(), e);
        }
        dir.init(file, create);
        DIRECTORIES.put(file, dir);
      } else if (create) {
        dir.create();
      }
    }
    synchronized (dir) {
      dir.refCount++;
    }
    return dir;
  }

  private File directory = null;
  private int refCount;
  private File lockDir;

  protected FSDirectory() {};                     // permit subclassing

  private void init(File path, boolean create) throws IOException {
    directory = path;

    if (LOCK_DIR == null) {
      lockDir = directory;
    }
    else {
      lockDir = new File(LOCK_DIR);
    }
    // Ensure that lockDir exists and is a directory.
    if (!lockDir.exists()) {
      if (!lockDir.mkdirs())
        throw new IOException("Cannot create directory: " + lockDir.getAbsolutePath());
    } else if (!lockDir.isDirectory()) {
      throw new IOException("Found regular file where directory expected: " +
          lockDir.getAbsolutePath());
    }
    if (create) {
      create();
    }

    if (!directory.isDirectory())
      throw new IOException(path + " not a directory");
  }

  private synchronized void create() throws IOException {
    if (!directory.exists())
      if (!directory.mkdirs())
        throw new IOException("Cannot create directory: " + directory);

    if (!directory.isDirectory())
      throw new IOException(directory + " not a directory");

    String[] files = directory.list(new IndexFileNameFilter());            // clear old files
    if (files == null)
      throw new IOException("Cannot read directory " + directory.getAbsolutePath());
    for (int i = 0; i < files.length; i++) {
      File file = new File(directory, files[i]);
      if (!file.delete())
        throw new IOException("Cannot delete " + file);
    }

    String lockPrefix = getLockPrefix().toString(); // clear old locks
    files = lockDir.list();
    if (files == null)
      throw new IOException("Cannot read lock directory " + lockDir.getAbsolutePath());
    for (int i = 0; i < files.length; i++) {
      if (!files[i].startsWith(lockPrefix))
        continue;
      File lockFile = new File(lockDir, files[i]);
      if (!lockFile.delete())
        throw new IOException("Cannot delete " + lockFile);
    }
  }

  /** Returns an array of strings, one for each file in the directory. */
  public String[] list() {
    return directory.list();
  }

  /** Returns true iff a file with the given name exists. */
  public boolean fileExists(String name) {
    File file = new File(directory, name);
    return file.exists();
  }

  /** Returns the time the named file was last modified. */
  public long fileModified(String name) {
    File file = new File(directory, name);
    return file.lastModified();
  }

  /** Returns the time the named file was last modified. */
  public static long fileModified(File directory, String name) {
    File file = new File(directory, name);
    return file.lastModified();
  }

  /** Set the modified time of an existing file to now. */
  public void touchFile(String name) {
    File file = new File(directory, name);
    file.setLastModified(System.currentTimeMillis());
  }

  /** Returns the length in bytes of a file in the directory. */
  public long fileLength(String name) {
    File file = new File(directory, name);
    return file.length();
  }

  /** Removes an existing file in the directory. */
  public void deleteFile(String name) throws IOException {
    File file = new File(directory, name);
    if (!file.delete())
      throw new IOException("Cannot delete " + file);
  }

  /** renameFile is not atomic while it should be.
   *  This is a humble attempt to resolve the issue.
   *  Any piece of code that relies on renameFile atomicity
   *  should synchronize on FSDirectory.RENAME_LOCK
   */
  public static final Object  RENAME_LOCK = new Object();

  /** Renames an existing file in the directory. */
  public synchronized void renameFile(String from, String to)
      throws IOException {
    File old = new File(directory, from);
    File nu = new File(directory, to);

    /* This is not atomic.  If the program crashes between the call to
       delete() and the call to renameTo() then we're screwed, but I've
       been unable to figure out how else to do this...
       ...
       Stenly: But sync-ing on RENAME_LOCK should solve the problem in case
               of concurrent threads within the same process.
    */
   synchronized (FSDirectory.RENAME_LOCK) {
    if (nu.exists())
      if (!nu.delete())
        throw new IOException("Cannot delete " + nu);

    // Rename the old file to the new one. Unfortunately, the renameTo()
    // method does not work reliably under some JVMs.  Therefore, if the
    // rename fails, we manually rename by copying the old file to the new one
    if (!old.renameTo(nu)) {
      java.io.InputStream in = null;
      java.io.OutputStream out = null;
      try {
        in = new FileInputStream(old);
        out = new FileOutputStream(nu);
        // see if the buffer needs to be initialized. Initialization is
        // only done on-demand since many VM's will never run into the renameTo
        // bug and hence shouldn't waste 1K of mem for no reason.
        if (buffer == null) {
          buffer = new byte[1024];
        }
        int len;
        while ((len = in.read(buffer)) >= 0) {
          out.write(buffer, 0, len);
        }

        // delete the old file.
        old.delete();
      }
      catch (IOException ioe) {
        IOException newExc = new IOException("Cannot rename " + old + " to " + nu);
        newExc.initCause(ioe);
        throw newExc;
      }
      finally {
        if (in != null) {
          try {
            in.close();
          } catch (IOException e) {
            throw new RuntimeException("Cannot close input stream: " + e.toString(), e);
          }
        }
        if (out != null) {
          try {
            out.close();
          } catch (IOException e) {
            throw new RuntimeException("Cannot close output stream: " + e.toString(), e);
          }
        }
      }
    }
   }
  }

  /** Creates a new, empty file in the directory with the given name.
      Returns a stream writing this file. */
  public IndexOutput createOutput(String name) throws IOException {
    File file = new File(directory, name);
    if (file.exists() && !file.delete())          // delete existing, if any
      throw new IOException("Cannot overwrite: " + file);

    return new FSIndexOutput(file);
  }

  /** Returns a stream reading an existing file. */
  public IndexInput openInput(String name) throws IOException {
    return new FSIndexInput(new File(directory, name));
  }

  /**
   * So we can do some byte-to-hexchar conversion below
   */
  private static final char[] HEX_DIGITS =
  {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'};

  /** Constructs a {@link Lock} with the specified name.  Locks are implemented
   * with {@link File#createNewFile()}.
   *
   * @param name the name of the lock file
   * @return an instance of <code>Lock</code> holding the lock
   */
  public Lock makeLock(String name) {
    StringBuffer buf = getLockPrefix();
    buf.append("-")8
Reply | Threaded
Open this post in threaded view
|

Re: design change suggestion

Michael McCandless-2

Hello,

These sound very interesting!

I think some of them would go under contrib (as utility tools?) and
others maybe into the core.  I've added more detailed comments below.

Stanislav Jordanov wrote:
> Hi guys,
>
> For the purpose of our product we've devised a bunch of small tool
> classes which handle various utility tasks like:
> 1. IndexRecoverer - assuming the "segments" file is missing or
> corrupted, this tool rebuilds it based on the *.cfs (and other) files
> found in the index dir (excludes files listed in deletable)

Excellent.  I know that various cases of "recovering an index" have
come up on the lists over time.  It would be great to have a single
tool that can try to correct the different problems that users hit, eg
removing a single unusable segments file, regenerating the segments
file, etc.

> 2. IndexSplitter - splits an existing index in 2, 3 or more relatively
> equally sized indices. It simply splits the segments files in distinct
> directories and the uses the IndexRecoverer to rebuild each new Index's
> segment file

Seems like a good tool for contrib?

> 3. IndexMerger - in reverse to IndexSplitter merges some indices into
> single index; Uses a modified version of  IndexWriter.addIndexes - it
> does not optimize() in the beginning and in the end. This way the
> resulting index is not a single huge cfs file, which is desirable in
> some cases.

You should have a look at the current Lucene trunk: a new method
(called addIndexesNoOptimize) has been added that I think addresses
this same need.

> 4. IndexOptimizer - Optimizes existing index by merging the 'small'
> segments and compacting the large segments (compacting means 'removing
> the deleted docs within them'); Also converts to compound file format
> any old-style "spilled" segments.

Ooh -- this sounds like a lighter weight version of the current
"optimize"?  Compacting single segments would be particularly useful
for very large indices that receive many updates to each doc.  It
seems like this could be a new method on IndexWriter?

Though I think this could break the index segments invariants (new
merge policy in IndexWriter in the trunk) when there are many deletes
against the large older segments (I think a fairly typical use case
actually).

> All of the above mentioned tools are classes within the
> org.apache.lucene.index package as they use some package-scope methods
> and properties (+ they feel like belonging there).
>
> Now the design change suggestion - it is about the 'deletable' related
> code;
> according to the source comments  - the delayed deletion of files
> through the 'deletable' is required on Window only as this OS prevents
> files opened for reading to be deleted.
> Working on the IndexOptimizer tool I found myself in a situation where I
> needed to 'safe delete' a bunch of obsolete segments while having only
> an (FS)Directory and a segment file name. And the 'safe delete' feature
> is in IndexWriter. Then after reviewing the code I came to the
> conclusion that the 'safe delete' feature logically belongs to the
> (FS)Directory class, not to IndexWriter. I was able to move the
> corresponding code from IndexWriter to (FS)Directory IMO this way is
> better.

You should also look at the trunk for this one.  The deletion logic
has moved into a separate class (IndexFileDeleter) which handles
figuring out which files 1) look to be Lucene index files, but 2) are
not in fact referenced by the current segments file, and then
safely deletes them (retries).

Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: design change suggestion

Doug Cutting
Michael McCandless wrote:

>> 1. IndexRecoverer - assuming the "segments" file is missing or
>> corrupted, this tool rebuilds it based on the *.cfs (and other) files
>> found in the index dir (excludes files listed in deletable)
>
> Excellent.  I know that various cases of "recovering an index" have
> come up on the lists over time.  It would be great to have a single
> tool that can try to correct the different problems that users hit, eg
> removing a single unusable segments file, regenerating the segments
> file, etc.
>
>> 2. IndexSplitter - splits an existing index in 2, 3 or more relatively
>> equally sized indices. It simply splits the segments files in distinct
>> directories and the uses the IndexRecoverer to rebuild each new
>> Index's segment file
>
> Seems like a good tool for contrib?

If these rely only on the public index-format spec and public index
apis, then they could go in contrib, which would be easiest, since
expectations about back-compatibility and long-term support are lower
for contrib.

But if they rely on index package internals then they should be
maintained with the core.  Then the question becomes: are these features
that we can maintain long-term?  The index implementation will likely
evolve, and the existing public API should be supported through this
evolution: APIs must be more durable than implementations.  So, are
these features things that can be supported through likely
implementation changes?

I suspect they are.  We've talked about making the postings format more
flexible, but I have not heard anyone talk about a need to substantially
alter the segments & merging model.  Are we comfortable adding public
APIs that depend on that model?

An index splitter is useful with parallel and/or distributed search.
Splitting on segment boundaries is fairly limited, but perhaps, with
clever use of IndexWriter.setMaxMergeDocs(), it is sufficient.

Doug

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]