Stefan Neufeind wrote:
> How about making this a commandline-option to inject? Could you create
> an improvement-patch?
FWIW, a patch with similar functionality is in my work-in-progress
queue, however it's for 0.8 - there is no point in backporting my patch
because the architecture is very different...
Here's a snippet:
....
Index: src/java/org/apache/nutch/crawl/Injector.java
===================================================================
--- src/java/org/apache/nutch/crawl/Injector.java (revision 412602)
+++ src/java/org/apache/nutch/crawl/Injector.java (working copy)
@@ -20,10 +20,11 @@
import java.util.*;
import java.util.logging.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.LogFormatter;
import org.apache.hadoop.mapred.*;
import org.apache.nutch.net.*;
@@ -35,8 +36,8 @@
/** This class takes a flat file of URLs and adds them to the of pages
to be
* crawled. Useful for bootstrapping the system. */
public class Injector extends Configured {
- public static final Logger LOG =
- LogFormatter.getLogger("org.apache.nutch.crawl.Injector");
+ public static final Log LOG =
+ LogFactory.getLog(Injector.class);
/** Normalize and filter injected urls. */
@@ -46,7 +47,8 @@
private float scoreInjected;
private JobConf jobConf;
private URLFilters filters;
- private ScoringFilters scfilters;
+ private ScoringFilters scfilters;
+ private FetchSchedule schedule;
public void configure(JobConf job) {
this.jobConf = job;
@@ -55,6 +57,7 @@
filters = new URLFilters(jobConf);
scfilters = new ScoringFilters(jobConf);
scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);
+ schedule = FetchScheduleFactory.getFetchSchedule(job);
}
public void close() {}
@@ -69,17 +72,19 @@
url = urlNormalizer.normalize(url); // normalize the url
url = filters.filter(url); // filter the url
} catch (Exception e) {
- LOG.warning("Skipping " +url+":"+e);
+ LOG.warn("Skipping " +url+":"+e);
url = null;
}
if (url != null) { // if it passes
value.set(url); // collect it
- CrawlDatum datum = new
CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED, interval);
+ CrawlDatum datum = new CrawlDatum();
+ datum.setStatus(CrawlDatum.STATUS_INJECTED);
+ schedule.initializeSchedule(value, datum);
datum.setScore(scoreInjected);
try {
scfilters.initialScore(value, datum);
} catch (ScoringFilterException e) {
- LOG.warning("Cannot filter init score for url " + url +
+ LOG.warn("Cannot filter init score for url " + url +
", using default (" + e.getMessage() + ")");
datum.setScore(scoreInjected);
}
@@ -90,13 +95,87 @@
/** Combine multiple new entries for a url. */
public static class InjectReducer implements Reducer {
- public void configure(JobConf job) {}
+ private static final int RESET_NONE = 0x0000;
+ private static final int RESET_SCORE = 0x0001;
+ private static final int RESET_SCHEDULE = 0x0002;
+ private static final int RESET_METADATA = 0x0004;
+ private static final int RESET_ALL = 0x00ff;
+
+ private static final int[] masks = {
+ RESET_NONE,
+ RESET_SCORE,
+ RESET_SCHEDULE,
+ RESET_METADATA,
+ RESET_ALL
+ };
+ private static final String[] maskNames = {
+ "none",
+ "score",
+ "schedule",
+ "metadata",
+ "all"
+ };
+
+ private CrawlDatum injected, existing;
+ private int resetMode;
+ private FetchSchedule schedule;
+ private ScoringFilters scfilters;
+ private float scoreInjected;
+
+ public void configure(JobConf job) {
+ String mode = job.get("db.injected.reset.mask", "none");
+ List names = Arrays.asList(mode.toLowerCase().split("\\s"));
+ resetMode = RESET_NONE;
+ for (int i = 0; i < maskNames.length; i++) {
+ if (names.contains(maskNames[i])) resetMode |= masks[i];
+ }
+ scfilters = new ScoringFilters(job);
+ scoreInjected = job.getFloat("db.score.injected", 1.0f);
+ schedule = FetchScheduleFactory.getFetchSchedule(job);
+ }
+
public void close() {}
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException {
- output.collect(key, (Writable)values.next()); // just collect
first value
+ // there can be at most one value with status != STATUS_INJECTED
+ // and we also use only one value with status == STATUS_INJECTED
+ while (values.hasNext()) {
+ CrawlDatum datum = (CrawlDatum)values.next();
+ if (datum.getStatus() != CrawlDatum.STATUS_INJECTED) {
+ existing = datum;
+ } else {
+ injected = datum;
+ }
+ }
+ // set the status properly
+ if (injected != null)
injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
+
+ if (existing != null) {
+ if (injected == null) {
+ output.collect(key, existing); // no update
+ } else {
+ // check if we need to reset some values in the existing copy
+ if ((resetMode & RESET_SCORE) != 0) {
+ try {
+ scfilters.initialScore((UTF8)key, existing);
+ } catch (Exception e) {
+ LOG.warn("Couldn't filter initial score, key " + key + ":
" + e.getMessage());
+ existing.setScore(scoreInjected);
+ }
+ }
+ if ((resetMode & RESET_SCHEDULE) != 0) {
+ schedule.initializeSchedule((UTF8)key, existing);
+ }
+ if ((resetMode & RESET_METADATA) != 0) {
+ existing.setMetaData(new MapWritable());
+ }
+ output.collect(key, existing);
+ }
+ } else {
+ output.collect(key, injected);
+ }
}
}
--
Best regards,
Andrzej Bialecki <><
___. ___ ___ ___ _ _ __________________________________
[__ || __|__/|__||\/| Information Retrieval, Semantic Web
___|||__|| \| || | Embedded Unix, System Integration
http://www.sigram.com Contact: info at sigram dot com