[jira] Created: (NUTCH-202) Mapper, Reducer need an occasion to cleanup after the last record is processed.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] Created: (NUTCH-202) Mapper, Reducer need an occasion to cleanup after the last record is processed.

Michael Gibney (Jira)
Mapper, Reducer need an occasion to cleanup after the last record is processed.
-------------------------------------------------------------------------------

         Key: NUTCH-202
         URL: http://issues.apache.org/jira/browse/NUTCH-202
     Project: Nutch
        Type: Improvement
 Environment: Linux
    Reporter: Michel Tourn


Mapper, Reducer need an occasion to do some cleanup after the last record is processed.
Proposal (patch attached)
in interface Mapper:
 add method void finished();
in interface Reducer:
 add method void finished();

finished() methods are called from MapTask, CombiningCollector, ReduceTask.
------------
Known limitation: Fetcher (a multithreaded MapRunnable) does not call finished().
This is not currently a problem bec. fetcher Map/Reduce modules do not do anything in finished().
The right way to add finished() support to Fetcher would be to wait for all threads to finish,
then do:
     if (collector instanceof CombiningCollector) ((CombiningCollector)collector).finished();
------------
patch begins: (svn trunk)

Index: src/test/org/apache/nutch/mapred/MapredLoadTest.java
===================================================================
--- src/test/org/apache/nutch/mapred/MapredLoadTest.java (revision 374781)
+++ src/test/org/apache/nutch/mapred/MapredLoadTest.java (working copy)
@@ -69,6 +69,8 @@
                 out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
             }
         }
+        public void finished() {
+        }
     }
     static class RandomGenReducer implements Reducer {
         public void configure(JobConf job) {
@@ -81,6 +83,8 @@
                 out.collect(new UTF8("" + val), new UTF8(""));
             }
         }
+        public void finished() {
+        }
     }
     static class RandomCheckMapper implements Mapper {
         public void configure(JobConf job) {
@@ -92,6 +96,8 @@
 
             out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
         }
+        public void finished() {
+        }
     }
     static class RandomCheckReducer implements Reducer {
         public void configure(JobConf job) {
@@ -106,6 +112,8 @@
             }
             out.collect(new IntWritable(keyint), new IntWritable(count));
         }
+        public void finished() {
+        }
     }
 
     int range;
Index: src/test/org/apache/nutch/fs/TestNutchFileSystem.java
===================================================================
--- src/test/org/apache/nutch/fs/TestNutchFileSystem.java (revision 374783)
+++ src/test/org/apache/nutch/fs/TestNutchFileSystem.java (working copy)
@@ -155,6 +155,8 @@
 
       reporter.setStatus("wrote " + name);
     }
+    
+    public void finished() {}
   }
 
   public static void writeTest(NutchFileSystem fs, boolean fastCheck)
@@ -247,6 +249,9 @@
 
       reporter.setStatus("read " + name);
     }
+    
+    public void finished() {}
+    
   }
 
   public static void readTest(NutchFileSystem fs, boolean fastCheck)
@@ -339,6 +344,9 @@
         in.close();
       }
     }
+    
+    public void finished() {}
+    
   }
 
   public static void seekTest(NutchFileSystem fs, boolean fastCheck)
Index: src/java/org/apache/nutch/indexer/DeleteDuplicates.java
===================================================================
--- src/java/org/apache/nutch/indexer/DeleteDuplicates.java (revision 374776)
+++ src/java/org/apache/nutch/indexer/DeleteDuplicates.java (working copy)
@@ -225,6 +225,7 @@
         }
       }
     }
+    public void finished() {}
   }
     
   private NutchFileSystem fs;
@@ -265,6 +266,8 @@
       reader.close();
     }
   }
+  
+  public void finished() {}
 
   /** Write nothing. */
   public RecordWriter getRecordWriter(final NutchFileSystem fs,
Index: src/java/org/apache/nutch/indexer/Indexer.java
===================================================================
--- src/java/org/apache/nutch/indexer/Indexer.java (revision 374778)
+++ src/java/org/apache/nutch/indexer/Indexer.java (working copy)
@@ -227,6 +227,8 @@
 
     output.collect(key, new ObjectWritable(doc));
   }
+  
+  public void finished() {}
 
   public void index(File indexDir, File crawlDb, File linkDb, File[] segments)
     throws IOException {
Index: src/java/org/apache/nutch/segment/SegmentReader.java
===================================================================
--- src/java/org/apache/nutch/segment/SegmentReader.java (revision 374778)
+++ src/java/org/apache/nutch/segment/SegmentReader.java (working copy)
@@ -143,7 +143,9 @@
     }
     output.collect(key, new ObjectWritable(dump.toString()));
   }
-
+  
+  public void finished() {}
+  
   public void reader(File segment) throws IOException {
     LOG.info("Reader: segment: " + segment);
 
Index: src/java/org/apache/nutch/mapred/Mapper.java
===================================================================
--- src/java/org/apache/nutch/mapred/Mapper.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/Mapper.java (working copy)
@@ -39,4 +39,9 @@
   void map(WritableComparable key, Writable value,
            OutputCollector output, Reporter reporter)
     throws IOException;
+
+  /** Called after the last {@link #map} call on this Mapper object.
+      Typical implementations do nothing.
+  */
+  void finished();
 }
Index: src/java/org/apache/nutch/mapred/lib/RegexMapper.java
===================================================================
--- src/java/org/apache/nutch/mapred/lib/RegexMapper.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/RegexMapper.java (working copy)
@@ -53,4 +53,5 @@
       output.collect(new UTF8(matcher.group(group)), new LongWritable(1));
     }
   }
+  public void finished() {}
 }
Index: src/java/org/apache/nutch/mapred/lib/InverseMapper.java
===================================================================
--- src/java/org/apache/nutch/mapred/lib/InverseMapper.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/InverseMapper.java (working copy)
@@ -38,4 +38,6 @@
     throws IOException {
     output.collect((WritableComparable)value, key);
   }
+
+  public void finished() {}
 }
Index: src/java/org/apache/nutch/mapred/lib/IdentityReducer.java
===================================================================
--- src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (working copy)
@@ -42,4 +42,5 @@
     }
   }
 
+  public void finished() {}
 }
Index: src/java/org/apache/nutch/mapred/lib/IdentityMapper.java
===================================================================
--- src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (working copy)
@@ -39,4 +39,5 @@
     output.collect(key, val);
   }
 
+  public void finished() {}
 }
Index: src/java/org/apache/nutch/mapred/lib/LongSumReducer.java
===================================================================
--- src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (working copy)
@@ -47,4 +47,6 @@
     // output sum
     output.collect(key, new LongWritable(sum));
   }
+
+  public void finished() {}
 }
Index: src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java
===================================================================
--- src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java (working copy)
@@ -50,4 +50,6 @@
       output.collect(new UTF8(st.nextToken()), new LongWritable(1));
     }
   }
+
+  public void finished() {}
 }
Index: src/java/org/apache/nutch/mapred/ReduceTask.java
===================================================================
--- src/java/org/apache/nutch/mapred/ReduceTask.java (revision 374781)
+++ src/java/org/apache/nutch/mapred/ReduceTask.java (working copy)
@@ -275,6 +275,7 @@
       }
 
     } finally {
+      reducer.finished();
       in.close();
       lfs.delete(new File(sortedFile));           // remove sorted
       out.close(reporter);
Index: src/java/org/apache/nutch/mapred/MapTask.java
===================================================================
--- src/java/org/apache/nutch/mapred/MapTask.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/MapTask.java (working copy)
@@ -50,7 +50,7 @@
   public void write(DataOutput out) throws IOException {
     super.write(out);
     split.write(out);
-    
+
   }
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
@@ -126,6 +126,10 @@
         }
 
       } finally {
+        if (combining) {
+          ((CombiningCollector)collector).finished();
+        }
+
         in.close();                               // close input
       }
     } finally {
@@ -147,5 +151,5 @@
   public NutchConf getConf() {
     return this.nutchConf;
   }
-  
+
 }
Index: src/java/org/apache/nutch/mapred/MapRunner.java
===================================================================
--- src/java/org/apache/nutch/mapred/MapRunner.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/MapRunner.java (working copy)
@@ -38,18 +38,22 @@
   public void run(RecordReader input, OutputCollector output,
                   Reporter reporter)
     throws IOException {
-    while (true) {
-      // allocate new key & value instances
-      WritableComparable key =
-        (WritableComparable)job.newInstance(inputKeyClass);
-      Writable value = (Writable)job.newInstance(inputValueClass);
+    try {
+      while (true) {
+        // allocate new key & value instances
+        WritableComparable key =
+          (WritableComparable)job.newInstance(inputKeyClass);
+        Writable value = (Writable)job.newInstance(inputValueClass);
 
-      // read next key & value
-      if (!input.next(key, value))
-        return;
+        // read next key & value
+        if (!input.next(key, value))
+          return;
 
-      // map pair to output
-      mapper.map(key, value, output, reporter);
+        // map pair to output
+        mapper.map(key, value, output, reporter);
+      }
+    } finally {
+        mapper.finished();
     }
   }
 
Index: src/java/org/apache/nutch/mapred/CombiningCollector.java
===================================================================
--- src/java/org/apache/nutch/mapred/CombiningCollector.java (revision 374780)
+++ src/java/org/apache/nutch/mapred/CombiningCollector.java (working copy)
@@ -78,4 +78,9 @@
     count = 0;
   }
 
+  public synchronized void finished()
+  {
+    combiner.finished();
+  }
+
 }
Index: src/java/org/apache/nutch/mapred/Reducer.java
===================================================================
--- src/java/org/apache/nutch/mapred/Reducer.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/Reducer.java (working copy)
@@ -38,4 +38,10 @@
   void reduce(WritableComparable key, Iterator values,
               OutputCollector output, Reporter reporter)
     throws IOException;
+
+  /** Called after the last {@link #reduce} call on this Reducer object.
+      Typical implementations do nothing.
+  */
+  void finished();
+
 }
Index: src/java/org/apache/nutch/crawl/CrawlDbReader.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDbReader.java (revision 374737)
+++ src/java/org/apache/nutch/crawl/CrawlDbReader.java (working copy)
@@ -50,9 +50,9 @@
 
 /**
  * Read utility for the CrawlDB.
- *
+ *
  * @author Andrzej Bialecki
- *
+ *
  */
 public class CrawlDbReader {
 
@@ -68,6 +68,7 @@
       output.collect(new UTF8("retry"), new LongWritable(cd.getRetriesSinceFetch()));
       output.collect(new UTF8("score"), new LongWritable((long) (cd.getScore() * 1000.0)));
     }
+    public void finished() {}
   }
 
   public static class CrawlDbStatReducer implements Reducer {
@@ -121,6 +122,7 @@
         output.collect(new UTF8("avg score"), new LongWritable(total / cnt));
       }
     }
+    public void finished() {}
   }
 
   public static class CrawlDbDumpReducer implements Reducer {
@@ -133,8 +135,11 @@
 
     public void configure(JobConf job) {
     }
+
+    public void finished() {
+    }
   }
-  
+
   public void processStatJob(String crawlDb, NutchConf config) throws IOException {
     LOG.info("CrawlDb statistics start: " + crawlDb);
     File tmpFolder = new File(crawlDb, "stat_tmp" + System.currentTimeMillis());
@@ -219,7 +224,7 @@
       System.out.println("not found");
     }
   }
-  
+
   public void processDumpJob(String crawlDb, String output, NutchConf config) throws IOException {
 
     LOG.info("CrawlDb dump: starting");
@@ -270,4 +275,5 @@
     }
     return;
   }
+
 }
Index: src/java/org/apache/nutch/crawl/LinkDb.java
===================================================================
--- src/java/org/apache/nutch/crawl/LinkDb.java (revision 374779)
+++ src/java/org/apache/nutch/crawl/LinkDb.java (working copy)
@@ -118,7 +118,8 @@
     output.collect(key, result);
   }
 
-
+  public void finished() {}
+
   public void invert(File linkDb, File segmentsDir) throws IOException {
     LOG.info("LinkDb: starting");
     LOG.info("LinkDb: linkdb: " + linkDb);
Index: src/java/org/apache/nutch/crawl/Injector.java
===================================================================
--- src/java/org/apache/nutch/crawl/Injector.java (revision 374779)
+++ src/java/org/apache/nutch/crawl/Injector.java (working copy)
@@ -65,6 +65,8 @@
                                              interval));
       }
     }
+    
+    public void finished() {}
   }
 
   /** Combine multiple new entries for a url. */
@@ -76,6 +78,7 @@
       throws IOException {
       output.collect(key, (Writable)values.next()); // just collect first value
     }
+    public void finished() {}
   }
 
   /** Construct an Injector. */
Index: src/java/org/apache/nutch/crawl/Generator.java
===================================================================
--- src/java/org/apache/nutch/crawl/Generator.java (revision 374779)
+++ src/java/org/apache/nutch/crawl/Generator.java (working copy)
@@ -63,6 +63,8 @@
       output.collect(crawlDatum, key);          // invert for sort by score
     }
 
+    public void finished() {}
+    
     /** Partition by host (value). */
     public int getPartition(WritableComparable key, Writable value,
                             int numReduceTasks) {
Index: src/java/org/apache/nutch/crawl/CrawlDbReducer.java
===================================================================
--- src/java/org/apache/nutch/crawl/CrawlDbReducer.java (revision 374781)
+++ src/java/org/apache/nutch/crawl/CrawlDbReducer.java (working copy)
@@ -115,4 +115,5 @@
     }
   }
 
+  public void finished() {}
 }
Index: src/java/org/apache/nutch/parse/ParseSegment.java
===================================================================
--- src/java/org/apache/nutch/parse/ParseSegment.java (revision 374776)
+++ src/java/org/apache/nutch/parse/ParseSegment.java (working copy)
@@ -78,6 +78,8 @@
     throws IOException {
     output.collect(key, (Writable)values.next()); // collect first value
   }
+  
+  public void finished() {}
 
   public void parse(File segment) throws IOException {
     LOG.info("Parse: starting");







--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira