diff -crN --exclude=.svn nutch/src/java/org/apache/hadoop/extensions/FileOnlyPathFilter.java gnutch/org/apache/hadoop/extensions/FileOnlyPathFilter.java *** nutch/src/java/org/apache/hadoop/extensions/FileOnlyPathFilter.java 1969-12-31 19:00:00.000000000 -0500 --- gnutch/org/apache/hadoop/extensions/FileOnlyPathFilter.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 0 **** --- 1,28 ---- + package org.apache.hadoop.extensions; + + import java.io.IOException; + + import org.apache.hadoop.fs.PathFilter; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.fs.FileSystem; + + public class FileOnlyPathFilter implements PathFilter { + + protected FileSystem fs; + + public FileOnlyPathFilter(FileSystem _fs) + { + fs = _fs; + } + + public boolean accept(Path path) { + try + { + return fs.isFile(path); + } + catch(IOException e) + { + return false; + } + } + } \ No newline at end of file diff -crN --exclude=.svn nutch/src/java/org/apache/hadoop/extensions/FileOnlySequenceFileOutputFormat.java gnutch/org/apache/hadoop/extensions/FileOnlySequenceFileOutputFormat.java *** nutch/src/java/org/apache/hadoop/extensions/FileOnlySequenceFileOutputFormat.java 1969-12-31 19:00:00.000000000 -0500 --- gnutch/org/apache/hadoop/extensions/FileOnlySequenceFileOutputFormat.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 0 **** --- 1,40 ---- + package org.apache.hadoop.extensions; + + import java.io.IOException; + import java.util.Arrays; + + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.FileStatus; + import org.apache.hadoop.fs.Path; + + import org.apache.hadoop.io.SequenceFile; + import org.apache.hadoop.io.WritableComparable; + import org.apache.hadoop.io.Writable; + import org.apache.hadoop.io.SequenceFile.CompressionType; + import org.apache.hadoop.io.compress.CompressionCodec; + import org.apache.hadoop.io.compress.DefaultCodec; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.util.*; + + import org.apache.hadoop.mapred.SequenceFileOutputFormat; + import org.apache.hadoop.fs.FileUtil; + + import org.apache.hadoop.extensions.FileOnlyPathFilter; + + public class FileOnlySequenceFileOutputFormat extends SequenceFileOutputFormat { + public static SequenceFile.Reader[] getReaders(Configuration conf, Path dir) + throws IOException { + FileSystem fs = dir.getFileSystem(conf); + FileOnlyPathFilter filter = new FileOnlyPathFilter(fs); + Path[] names = FileUtil.stat2Paths(fs.listStatus(dir,filter)); + + // sort names, so that hash partitioning works + Arrays.sort(names); + + SequenceFile.Reader[] parts = new SequenceFile.Reader[names.length]; + for (int i = 0; i < names.length; i++) { + parts[i] = new SequenceFile.Reader(fs, names[i], conf); + } + return parts; + } + } \ No newline at end of file diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/CrawlDatum.java gnutch/org/apache/nutch/crawl/CrawlDatum.java *** nutch/src/java/org/apache/nutch/crawl/CrawlDatum.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/crawl/CrawlDatum.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 433,436 **** --- 433,440 ---- throw new RuntimeException(e); } } + + public CrawlDatum shallowCopy() { + return (CrawlDatum)clone(); + } } diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/CrawlDbFilter.java gnutch/org/apache/nutch/crawl/CrawlDbFilter.java *** nutch/src/java/org/apache/nutch/crawl/CrawlDbFilter.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/crawl/CrawlDbFilter.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 37,43 **** * * @author Andrzej Bialecki */ ! public class CrawlDbFilter implements Mapper { public static final String URL_FILTERING = "crawldb.url.filters"; public static final String URL_NORMALIZING = "crawldb.url.normalizers"; --- 37,43 ---- * * @author Andrzej Bialecki */ ! public class CrawlDbFilter implements Mapper { public static final String URL_FILTERING = "crawldb.url.filters"; public static final String URL_NORMALIZING = "crawldb.url.normalizers"; *************** *** 72,78 **** private Text newKey = new Text(); ! public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { String url = key.toString(); if (urlNormalizers) { --- 72,78 ---- private Text newKey = new Text(); ! public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { String url = key.toString(); if (urlNormalizers) { diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/CrawlDbMerger.java gnutch/org/apache/nutch/crawl/CrawlDbMerger.java *** nutch/src/java/org/apache/nutch/crawl/CrawlDbMerger.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/crawl/CrawlDbMerger.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 64,76 **** public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { ! CrawlDatum res = null; long resTime = 0L; meta.clear(); while (values.hasNext()) { CrawlDatum val = values.next(); if (res == null) { ! res = val; resTime = schedule.calculateLastFetchTime(res); meta.putAll(res.getMetaData()); continue; --- 64,76 ---- public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { ! CrawlDatum res = new CrawlDatum(); long resTime = 0L; meta.clear(); while (values.hasNext()) { CrawlDatum val = values.next(); if (res == null) { ! res.set(val); resTime = schedule.calculateLastFetchTime(res); meta.putAll(res.getMetaData()); continue; *************** *** 80,91 **** if (valTime > resTime) { // collect all metadata, newer values override older values meta.putAll(val.getMetaData()); ! res = val; resTime = valTime ; } else { // insert older metadata before newer val.getMetaData().putAll(meta); ! meta = val.getMetaData(); } } res.setMetaData(meta); --- 80,92 ---- if (valTime > resTime) { // collect all metadata, newer values override older values meta.putAll(val.getMetaData()); ! res.set(val); resTime = valTime ; } else { // insert older metadata before newer val.getMetaData().putAll(meta); ! meta.clear(); ! meta.putAll(val.getMetaData()); } } res.setMetaData(meta); diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/CrawlDbReader.java gnutch/org/apache/nutch/crawl/CrawlDbReader.java *** nutch/src/java/org/apache/nutch/crawl/CrawlDbReader.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/crawl/CrawlDbReader.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 49,55 **** import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; - import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.mapred.lib.IdentityMapper; --- 49,54 ---- *************** *** 59,64 **** --- 58,65 ---- import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.StringUtil; + import org.apache.hadoop.extensions.FileOnlySequenceFileOutputFormat; + /** * Read utility for the CrawlDB. * *************** *** 307,313 **** job.setReducerClass(CrawlDbStatReducer.class); job.setOutputPath(tmpFolder); ! job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); --- 308,314 ---- job.setReducerClass(CrawlDbStatReducer.class); job.setOutputPath(tmpFolder); ! job.setOutputFormat(FileOnlySequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); *************** *** 315,321 **** // reading the result FileSystem fileSystem = FileSystem.get(config); ! SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(config, tmpFolder); Text key = new Text(); LongWritable value = new LongWritable(); --- 316,322 ---- // reading the result FileSystem fileSystem = FileSystem.get(config); ! SequenceFile.Reader[] readers = FileOnlySequenceFileOutputFormat.getReaders(config, tmpFolder); Text key = new Text(); LongWritable value = new LongWritable(); *************** *** 435,441 **** job.setReducerClass(IdentityReducer.class); job.setOutputPath(tempDir); ! job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(FloatWritable.class); job.setOutputValueClass(Text.class); --- 436,442 ---- job.setReducerClass(IdentityReducer.class); job.setOutputPath(tempDir); ! job.setOutputFormat(FileOnlySequenceFileOutputFormat.class); job.setOutputKeyClass(FloatWritable.class); job.setOutputValueClass(Text.class); diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/CrawlDbReducer.java gnutch/org/apache/nutch/crawl/CrawlDbReducer.java *** nutch/src/java/org/apache/nutch/crawl/CrawlDbReducer.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/crawl/CrawlDbReducer.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 32,38 **** import org.apache.nutch.scoring.ScoringFilters; /** Merge new page entries with existing entries. */ ! public class CrawlDbReducer implements Reducer { public static final Log LOG = LogFactory.getLog(CrawlDbReducer.class); private int retryMax; --- 32,38 ---- import org.apache.nutch.scoring.ScoringFilters; /** Merge new page entries with existing entries. */ ! public class CrawlDbReducer implements Reducer { public static final Log LOG = LogFactory.getLog(CrawlDbReducer.class); private int retryMax; *************** *** 55,62 **** public void close() {} ! public void reduce(WritableComparable key, Iterator values, ! OutputCollector output, Reporter reporter) throws IOException { CrawlDatum fetch = null; --- 55,62 ---- public void close() {} ! public void reduce(WritableComparable key, Iterator values, ! OutputCollector output, Reporter reporter) throws IOException { CrawlDatum fetch = null; *************** *** 68,94 **** CrawlDatum datum = (CrawlDatum)values.next(); if (CrawlDatum.hasDbStatus(datum)) { if (old == null) { ! old = datum; } else { // always take the latest version ! if (old.getFetchTime() < datum.getFetchTime()) old = datum; } continue; } if (CrawlDatum.hasFetchStatus(datum)) { if (fetch == null) { ! fetch = datum; } else { // always take the latest version ! if (fetch.getFetchTime() < datum.getFetchTime()) fetch = datum; } continue; } switch (datum.getStatus()) { // collect other info case CrawlDatum.STATUS_LINKED: ! linked.add(datum); break; case CrawlDatum.STATUS_SIGNATURE: signature = datum.getSignature(); --- 68,94 ---- CrawlDatum datum = (CrawlDatum)values.next(); if (CrawlDatum.hasDbStatus(datum)) { if (old == null) { ! old = datum.shallowCopy(); } else { // always take the latest version ! if (old.getFetchTime() < datum.getFetchTime()) old = datum.shallowCopy(); } continue; } if (CrawlDatum.hasFetchStatus(datum)) { if (fetch == null) { ! fetch = datum.shallowCopy(); } else { // always take the latest version ! if (fetch.getFetchTime() < datum.getFetchTime()) fetch = datum.shallowCopy(); } continue; } switch (datum.getStatus()) { // collect other info case CrawlDatum.STATUS_LINKED: ! linked.add(datum.shallowCopy()); break; case CrawlDatum.STATUS_SIGNATURE: signature = datum.getSignature(); diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/Crawl.java gnutch/org/apache/nutch/crawl/Crawl.java *** nutch/src/java/org/apache/nutch/crawl/Crawl.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/crawl/Crawl.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 141,149 **** } // index, dedup & merge ! indexer.index(indexes, crawlDb, linkDb, fs.listPaths(segments, HadoopFSUtil.getPassAllFilter())); dedup.dedup(new Path[] { indexes }); ! merger.merge(fs.listPaths(indexes, HadoopFSUtil.getPassAllFilter()), index, tmpDir); } else { LOG.warn("No URLs to fetch - check your seed list and URL filters."); } --- 141,154 ---- } // index, dedup & merge ! Path[] paths = FileUtil.stat2Paths(fs.listStatus(segments, HadoopFSUtil.getPassAllFilter())); ! ! indexer.index(indexes, crawlDb, linkDb, paths); dedup.dedup(new Path[] { indexes }); ! ! ! paths = FileUtil.stat2Paths(fs.listStatus(segments, HadoopFSUtil.getPassAllFilter())); ! merger.merge(paths, index, tmpDir); } else { LOG.warn("No URLs to fetch - check your seed list and URL filters."); } diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/Generator.java gnutch/org/apache/nutch/crawl/Generator.java *** nutch/src/java/org/apache/nutch/crawl/Generator.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/crawl/Generator.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 30,35 **** --- 30,36 ---- import org.apache.hadoop.conf.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; + import org.apache.hadoop.extensions.FileOnlySequenceFileOutputFormat; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; *************** *** 341,362 **** } public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { ! CrawlDatum orig = null; ! LongWritable genTime = null; while (values.hasNext()) { CrawlDatum val = values.next(); if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) { ! genTime = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY); if (genTime.get() != generateTime) { ! orig = val; ! genTime = null; continue; } } else { ! orig = val; } } ! if (genTime != null) { orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); } output.collect(key, orig); --- 342,363 ---- } public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { ! CrawlDatum orig = new CrawlDatum(); ! LongWritable genTime = new LongWritable(); while (values.hasNext()) { CrawlDatum val = values.next(); if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) { ! genTime.set(((LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY)).get()); if (genTime.get() != generateTime) { ! orig.set(val); ! genTime.set((long)0); continue; } } else { ! orig.set(val); } } ! if (genTime.get() != 0) { orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); } output.collect(key, orig); *************** *** 448,454 **** job.setReducerClass(Selector.class); job.setOutputPath(tempDir); ! job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(FloatWritable.class); job.setOutputKeyComparatorClass(DecreasingFloatComparator.class); job.setOutputValueClass(SelectorEntry.class); --- 449,455 ---- job.setReducerClass(Selector.class); job.setOutputPath(tempDir); ! job.setOutputFormat(FileOnlySequenceFileOutputFormat.class); job.setOutputKeyClass(FloatWritable.class); job.setOutputKeyComparatorClass(DecreasingFloatComparator.class); job.setOutputValueClass(SelectorEntry.class); *************** *** 460,466 **** } // check that we selected at least some entries ... ! SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(job, tempDir); boolean empty = true; if (readers != null && readers.length > 0) { for (int num = 0; num < readers.length; num++) { --- 461,467 ---- } // check that we selected at least some entries ... ! SequenceFile.Reader[] readers = FileOnlySequenceFileOutputFormat.getReaders(job, tempDir); boolean empty = true; if (readers != null && readers.length > 0) { for (int num = 0; num < readers.length; num++) { diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/Injector.java gnutch/org/apache/nutch/crawl/Injector.java *** nutch/src/java/org/apache/nutch/crawl/Injector.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/crawl/Injector.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 72,78 **** url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT); url = filters.filter(url); // filter the url } catch (Exception e) { ! if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); } url = null; } if (url != null) { // if it passes --- 72,78 ---- url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT); url = filters.filter(url); // filter the url } catch (Exception e) { ! if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); } url = null; } if (url != null) { // if it passes *************** *** 102,122 **** public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { ! CrawlDatum old = null; ! CrawlDatum injected = null; while (values.hasNext()) { CrawlDatum val = values.next(); if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { ! injected = val; ! injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); } else { ! old = val; } } CrawlDatum res = null; ! if (old != null) res = old; // don't overwrite existing value ! else res = injected; ! output.collect(key, res); } } --- 102,129 ---- public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { ! ! CrawlDatum old = new CrawlDatum(); ! CrawlDatum injected = new CrawlDatum(); ! ! boolean oldNull = false; ! while (values.hasNext()) { CrawlDatum val = values.next(); if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { ! injected.set(val); ! injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); } else { ! old.set(val); ! oldNull = true; } } CrawlDatum res = null; ! if (oldNull) { ! res = old; // don't overwrite existing value ! } else { ! res = injected; ! } output.collect(key, res); } } *************** *** 144,149 **** --- 151,157 ---- if (LOG.isInfoEnabled()) { LOG.info("Injector: Converting injected urls to crawl db entries."); } + JobConf sortJob = new NutchJob(getConf()); sortJob.setJobName("inject " + urlDir); sortJob.setInputPath(urlDir); diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/Inlink.java gnutch/org/apache/nutch/crawl/Inlink.java *** nutch/src/java/org/apache/nutch/crawl/Inlink.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/crawl/Inlink.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 33,38 **** --- 33,43 ---- this.anchor = anchor; } + public Inlink(Inlink i) { + this.fromUrl = i.getFromUrl(); + this.anchor = i.getAnchor(); + } + public void readFields(DataInput in) throws IOException { fromUrl = Text.readString(in); anchor = Text.readString(in); diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/LinkDbFilter.java gnutch/org/apache/nutch/crawl/LinkDbFilter.java *** nutch/src/java/org/apache/nutch/crawl/LinkDbFilter.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/crawl/LinkDbFilter.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 38,44 **** * * @author Andrzej Bialecki */ ! public class LinkDbFilter implements Mapper { public static final String URL_FILTERING = "linkdb.url.filters"; public static final String URL_NORMALIZING = "linkdb.url.normalizer"; --- 38,44 ---- * * @author Andrzej Bialecki */ ! public class LinkDbFilter implements Mapper { public static final String URL_FILTERING = "linkdb.url.filters"; public static final String URL_NORMALIZING = "linkdb.url.normalizer"; *************** *** 73,79 **** public void close() {} ! public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { String url = key.toString(); Inlinks result = new Inlinks(); if (normalize) { --- 73,79 ---- public void close() {} ! public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { String url = key.toString(); Inlinks result = new Inlinks(); if (normalize) { diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/LinkDbMerger.java gnutch/org/apache/nutch/crawl/LinkDbMerger.java *** nutch/src/java/org/apache/nutch/crawl/LinkDbMerger.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/crawl/LinkDbMerger.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 83,94 **** Iterator it = inlinks.iterator(); int i = 0; while(it.hasNext() && i++ < end) { ! result.add(it.next()); } } if (result.size() == 0) return; output.collect(key, result); - } public void configure(JobConf job) { --- 83,93 ---- Iterator it = inlinks.iterator(); int i = 0; while(it.hasNext() && i++ < end) { ! result.add(new Inlink(it.next())); } } if (result.size() == 0) return; output.collect(key, result); } public void configure(JobConf job) { diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/crawl/OldReducer gnutch/org/apache/nutch/crawl/OldReducer *** nutch/src/java/org/apache/nutch/crawl/OldReducer 1969-12-31 19:00:00.000000000 -0500 --- gnutch/org/apache/nutch/crawl/OldReducer 2008-06-05 17:13:24.000000000 -0400 *************** *** 0 **** --- 1,255 ---- + /** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.nutch.crawl; + + import java.util.ArrayList; + import java.util.Iterator; + import java.io.IOException; + + // Commons Logging imports + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; + + import org.apache.hadoop.io.*; + import org.apache.hadoop.mapred.*; + import org.apache.nutch.metadata.Nutch; + import org.apache.nutch.scoring.ScoringFilterException; + import org.apache.nutch.scoring.ScoringFilters; + + /** Merge new page entries with existing entries. */ + public class CrawlDbReducer implements Reducer { + public static final Log LOG = LogFactory.getLog(CrawlDbReducer.class); + + private int retryMax; + private CrawlDatum result = new CrawlDatum(); + private ArrayList linked = new ArrayList(); + private ScoringFilters scfilters = null; + private boolean additionsAllowed; + private int maxInterval; + private FetchSchedule schedule; + + public void configure(JobConf job) { + retryMax = job.getInt("db.fetch.retry.max", 3); + scfilters = new ScoringFilters(job); + additionsAllowed = job.getBoolean(CrawlDb.CRAWLDB_ADDITIONS_ALLOWED, true); + int oldMaxInterval = job.getInt("db.max.fetch.interval", 0); + maxInterval = job.getInt("db.fetch.interval.max", 0 ); + if (oldMaxInterval > 0 && maxInterval == 0) maxInterval = oldMaxInterval * FetchSchedule.SECONDS_PER_DAY; + schedule = FetchScheduleFactory.getFetchSchedule(job); + } + + public void close() {} + + public void reduce(WritableComparable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + + CrawlDatum fetch = new CrawlDatum(); + CrawlDatum old = new CrawlDatum(); + + boolean fetchNull = true; + boolean oldNull = true; + + byte[] signature = null; + linked.clear(); + + while (values.hasNext()) { + CrawlDatum datum = (CrawlDatum)values.next(); + if (CrawlDatum.hasDbStatus(datum)) { + if (oldNull) { + old.set(datum); + oldNull = false; + } else { + // always take the latest version + if (old.getFetchTime() < datum.getFetchTime()) old.set(datum); + } + continue; + } + + if (CrawlDatum.hasFetchStatus(datum)) { + if (fetchNull) { + fetch.set(datum); + fetchNull = false; + } else { + // always take the latest version + if (fetch.getFetchTime() < datum.getFetchTime()) fetch.set(datum); + } + continue; + } + + switch (datum.getStatus()) { // collect other info + case CrawlDatum.STATUS_LINKED: + linked.add(datum); + break; + case CrawlDatum.STATUS_SIGNATURE: + signature = datum.getSignature(); + break; + default: + LOG.warn("Unknown status, key: " + key + ", datum: " + datum); + } + } + + // if it doesn't already exist, skip it + if (oldNull && !additionsAllowed) return; + + // if there is no fetched datum, perhaps there is a link + if (fetchNull && linked.size() > 0) { + fetch = linked.get(0); + } + + // still no new data - record only unchanged old data, if exists, and return + if (fetchNull) { + if (!oldNull) // at this point at least "old" should be present + output.collect(key, old); + else + LOG.warn("Missing fetch and old value, signature=" + signature); + return; + } + + if (signature == null) signature = fetch.getSignature(); + long prevModifiedTime = !oldNull ? old.getModifiedTime() : 0L; + long prevFetchTime = !oldNull ? old.getFetchTime() : 0L; + + // initialize with the latest version, be it fetch or link + result.set(fetch); + if (!oldNull) { + // copy metadata from old, if exists + if (old.getMetaData().size() > 0) { + result.getMetaData().putAll(old.getMetaData()); + // overlay with new, if any + if (fetch.getMetaData().size() > 0) + result.getMetaData().putAll(fetch.getMetaData()); + } + // set the most recent valid value of modifiedTime + if (old.getModifiedTime() > 0 && fetch.getModifiedTime() == 0) { + result.setModifiedTime(old.getModifiedTime()); + } + } + + switch (fetch.getStatus()) { // determine new status + + case CrawlDatum.STATUS_LINKED: // it was link + if (!oldNull) { // if old exists + result.set(old); // use it + } else { + result = schedule.initializeSchedule((Text)key, result); + result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); + try { + scfilters.initialScore((Text)key, result); + } catch (ScoringFilterException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Cannot filter init score for url " + key + + ", using default: " + e.getMessage()); + } + result.setScore(0.0f); + } + } + break; + + case CrawlDatum.STATUS_FETCH_SUCCESS: // succesful fetch + case CrawlDatum.STATUS_FETCH_REDIR_TEMP: // successful fetch, redirected + case CrawlDatum.STATUS_FETCH_REDIR_PERM: + case CrawlDatum.STATUS_FETCH_NOTMODIFIED: // successful fetch, notmodified + // determine the modification status + int modified = FetchSchedule.STATUS_UNKNOWN; + if (fetch.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED) { + modified = FetchSchedule.STATUS_NOTMODIFIED; + } else { + if ((!oldNull) && (old.getSignature() != null) && (signature != null)) { + if (SignatureComparator._compare(old.getSignature(), signature) != 0) { + modified = FetchSchedule.STATUS_MODIFIED; + } else { + modified = FetchSchedule.STATUS_NOTMODIFIED; + } + } + } + // set the schedule + result = schedule.setFetchSchedule((Text)key, result, prevFetchTime, + prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(), modified); + // set the result status and signature + if (modified == FetchSchedule.STATUS_NOTMODIFIED) { + result.setStatus(CrawlDatum.STATUS_DB_NOTMODIFIED); + if (!oldNull) result.setSignature(old.getSignature()); + } else { + switch (fetch.getStatus()) { + case CrawlDatum.STATUS_FETCH_SUCCESS: + result.setStatus(CrawlDatum.STATUS_DB_FETCHED); + break; + case CrawlDatum.STATUS_FETCH_REDIR_PERM: + result.setStatus(CrawlDatum.STATUS_DB_REDIR_PERM); + break; + case CrawlDatum.STATUS_FETCH_REDIR_TEMP: + result.setStatus(CrawlDatum.STATUS_DB_REDIR_TEMP); + break; + default: + LOG.warn("Unexpected status: " + fetch.getStatus() + " resetting to old status."); + if (!oldNull) result.setStatus(old.getStatus()); + else result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); + } + result.setSignature(signature); + } + // if fetchInterval is larger than the system-wide maximum, trigger + // an unconditional recrawl. This prevents the page to be stuck at + // NOTMODIFIED state, when the old fetched copy was already removed with + // old segments. + if (maxInterval < result.getFetchInterval()) + result = schedule.forceRefetch((Text)key, result, false); + break; + case CrawlDatum.STATUS_SIGNATURE: + if (LOG.isWarnEnabled()) { + LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: " + key); + } + return; + case CrawlDatum.STATUS_FETCH_RETRY: // temporary failure + if (!oldNull) { + result.setSignature(old.getSignature()); // use old signature + } + result = schedule.setPageRetrySchedule((Text)key, result, prevFetchTime, + prevModifiedTime, fetch.getFetchTime()); + if (result.getRetriesSinceFetch() < retryMax) { + result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); + } else { + result.setStatus(CrawlDatum.STATUS_DB_GONE); + } + break; + + case CrawlDatum.STATUS_FETCH_GONE: // permanent failure + if (!oldNull) + result.setSignature(old.getSignature()); // use old signature + result.setStatus(CrawlDatum.STATUS_DB_GONE); + result = schedule.setPageGoneSchedule((Text)key, result, prevFetchTime, + prevModifiedTime, fetch.getFetchTime()); + break; + + default: + throw new RuntimeException("Unknown status: " + fetch.getStatus() + " " + key); + } + + try { + scfilters.updateDbScore((Text)key, old, result, linked); + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Couldn't update score, key=" + key + ": " + e); + } + } + // remove generation time, if any + result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY); + output.collect(key, result); + } + + } diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/extensions/CrawlTask.java gnutch/org/apache/nutch/extensions/CrawlTask.java *** nutch/src/java/org/apache/nutch/extensions/CrawlTask.java 1969-12-31 19:00:00.000000000 -0500 --- gnutch/org/apache/nutch/extensions/CrawlTask.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 0 **** --- 1,134 ---- + /** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.nutch.extensions; + + import java.util.*; + import java.text.*; + + // Commons Logging imports + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; + + import org.apache.nutch.crawl.Crawl; + import org.apache.nutch.crawl.CrawlDb; + import org.apache.nutch.crawl.Generator; + import org.apache.nutch.crawl.Injector; + + import org.apache.nutch.fetcher.Fetcher; + import org.apache.hadoop.fs.*; + import org.apache.hadoop.conf.*; + import org.apache.hadoop.mapred.*; + import org.apache.nutch.parse.ParseSegment; + import org.apache.nutch.indexer.DeleteDuplicates; + import org.apache.nutch.indexer.IndexMerger; + import org.apache.nutch.indexer.Indexer; + import org.apache.nutch.util.HadoopFSUtil; + import org.apache.nutch.util.NutchConfiguration; + import org.apache.nutch.util.NutchJob; + + import org.apache.hadoop.fs.FileUtil; + import org.apache.hadoop.extensions.FileOnlyPathFilter; + + public class CrawlTask { + public static final Log LOG = LogFactory.getLog(Crawl.class); + + private static String getDate() { + return new SimpleDateFormat("yyyyMMddHHmmss").format + (new Date(System.currentTimeMillis())); + } + + + /* Perform complete crawling */ + public static void main(String args[]) throws Exception { + if (args.length < 1) { + System.out.println + ("Usage: CrawlTask [-dir d] [-threads n] [-depth i] [-topN N]"); + return; + } + + Configuration conf = NutchConfiguration.create(); + conf.addResource("crawl-tool.xml"); + JobConf job = new NutchJob(conf); + + Path rootUrlDir = null; + Path dir = new Path("crawl-" + getDate()); + int threads = job.getInt("fetcher.threads.fetch", 10); + int depth = 5; + long topN = Long.MAX_VALUE; + + for (int i = 0; i < args.length; i++) { + if ("-dir".equals(args[i])) { + dir = new Path(args[i+1]); + i++; + } else if ("-threads".equals(args[i])) { + threads = Integer.parseInt(args[i+1]); + i++; + } else if ("-depth".equals(args[i])) { + depth = Integer.parseInt(args[i+1]); + i++; + } else if ("-topN".equals(args[i])) { + topN = Integer.parseInt(args[i+1]); + i++; + } else if (args[i] != null) { + rootUrlDir = new Path(args[i]); + } + } + + FileSystem fs = FileSystem.get(job); + + if (LOG.isInfoEnabled()) { + LOG.info("crawl started in: " + dir); + LOG.info("rootUrlDir = " + rootUrlDir); + LOG.info("threads = " + threads); + LOG.info("depth = " + depth); + if (topN != Long.MAX_VALUE) + LOG.info("topN = " + topN); + } + + Path crawlDb = new Path(dir + "/crawldb"); + Path linkDb = new Path(dir + "/linkdb"); + Path segments = new Path(dir + "/segments"); + Path indexes = new Path(dir + "/indexes"); + Path index = new Path(dir + "/index"); + + Path tmpDir = job.getLocalPath("crawl"+Path.SEPARATOR+getDate()); + Injector injector = new Injector(conf); + Generator generator = new Generator(conf); + Fetcher fetcher = new Fetcher(conf); + ParseSegment parseSegment = new ParseSegment(conf); + CrawlDb crawlDbTool = new CrawlDb(conf); + + // initialize crawlDb + injector.inject(crawlDb, rootUrlDir); + int i; + for (i = 0; i < depth; i++) { // generate new segment + Path segment = generator.generate(crawlDb, segments, -1, topN, System + .currentTimeMillis(),false,false); + if (segment == null) { + LOG.info("Stopping at depth=" + i + " - no more URLs to fetch."); + break; + } + fetcher.fetch(segment, threads); // fetch it + if (!Fetcher.isParsing(job)) { + parseSegment.parse(segment); // parse it, if needed + } + crawlDbTool.update(crawlDb, new Path[]{segment}, true, false); // update crawldb + } + if (LOG.isInfoEnabled()) { LOG.info("crawl finished: " + dir); } + } + } diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/extensions/CrawlTask.java~ gnutch/org/apache/nutch/extensions/CrawlTask.java~ *** nutch/src/java/org/apache/nutch/extensions/CrawlTask.java~ 1969-12-31 19:00:00.000000000 -0500 --- gnutch/org/apache/nutch/extensions/CrawlTask.java~ 2008-06-05 17:13:24.000000000 -0400 *************** *** 0 **** --- 1,134 ---- + /** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.nutch.extensions; + + import java.util.*; + import java.text.*; + + // Commons Logging imports + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; + + import org.apache.nutch.crawl.Crawl; + import org.apache.nutch.crawl.CrawlDb; + import org.apache.nutch.crawl.Generator; + import org.apache.nutch.crawl.Injector; + + import org.apache.nutch.fetcher.Fetcher; + import org.apache.hadoop.fs.*; + import org.apache.hadoop.conf.*; + import org.apache.hadoop.mapred.*; + import org.apache.nutch.parse.ParseSegment; + import org.apache.nutch.indexer.DeleteDuplicates; + import org.apache.nutch.indexer.IndexMerger; + import org.apache.nutch.indexer.Indexer; + import org.apache.nutch.util.HadoopFSUtil; + import org.apache.nutch.util.NutchConfiguration; + import org.apache.nutch.util.NutchJob; + + import org.apache.hadoop.fs.FileUtil; + import org.apache.hadoop.extensions.FileOnlyPathFilter; + + public class CrawlTask { + public static final Log LOG = LogFactory.getLog(Crawl.class); + + private static String getDate() { + return new SimpleDateFormat("yyyyMMddHHmmss").format + (new Date(System.currentTimeMillis())); + } + + + /* Perform complete crawling */ + public static void main(String args[]) throws Exception { + if (args.length < 1) { + System.out.println + ("Usage: CrawlTask [-dir d] [-threads n] [-depth i] [-topN N]"); + return; + } + + Configuration conf = NutchConfiguration.create(); + conf.addResource("crawl-tool.xml"); + JobConf job = new NutchJob(conf); + + Path rootUrlDir = null; + Path dir = new Path("crawl-" + getDate()); + int threads = job.getInt("fetcher.threads.fetch", 10); + int depth = 5; + long topN = Long.MAX_VALUE; + + for (int i = 0; i < args.length; i++) { + if ("-dir".equals(args[i])) { + dir = new Path(args[i+1]); + i++; + } else if ("-threads".equals(args[i])) { + threads = Integer.parseInt(args[i+1]); + i++; + } else if ("-depth".equals(args[i])) { + depth = Integer.parseInt(args[i+1]); + i++; + } else if ("-topN".equals(args[i])) { + topN = Integer.parseInt(args[i+1]); + i++; + } else if (args[i] != null) { + rootUrlDir = new Path(args[i]); + } + } + + FileSystem fs = FileSystem.get(job); + + if (LOG.isInfoEnabled()) { + LOG.info("crawl started in: " + dir); + LOG.info("rootUrlDir = " + rootUrlDir); + LOG.info("threads = " + threads); + LOG.info("depth = " + depth); + if (topN != Long.MAX_VALUE) + LOG.info("topN = " + topN); + } + + Path crawlDb = new Path(dir + "/crawldb"); + Path linkDb = new Path(dir + "/linkdb"); + Path segments = new Path(dir + "/segments"); + Path indexes = new Path(dir + "/indexes"); + Path index = new Path(dir + "/index"); + + Path tmpDir = job.getLocalPath("crawl"+Path.SEPARATOR+getDate()); + Injector injector = new Injector(conf); + Generator generator = new Generator(conf); + Fetcher fetcher = new Fetcher(conf); + ParseSegment parseSegment = new ParseSegment(conf); + CrawlDb crawlDbTool = new CrawlDb(conf); + + // initialize crawlDb + injector.inject(crawlDb, rootUrlDir); + int i; + for (i = 0; i < depth; i++) { // generate new segment + Path segment = generator.generate(crawlDb, segments, -1, topN, System + .currentTimeMillis(),false,false); + if (segment == null) { + LOG.info("Stopping at depth=" + i + " - no more URLs to fetch."); + break; + } + fetcher.fetch(segment, threads); // fetch it + if (!Fetcher.isParsing(job)) { + parseSegment.parse(segment); // parse it, if needed + } + crawlDbTool.update(crawlDb, new Path[]{segment}, false, true); // update crawldb + } + if (LOG.isInfoEnabled()) { LOG.info("crawl finished: " + dir); } + } + } diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/extensions/GLog.java gnutch/org/apache/nutch/extensions/GLog.java *** nutch/src/java/org/apache/nutch/extensions/GLog.java 1969-12-31 19:00:00.000000000 -0500 --- gnutch/org/apache/nutch/extensions/GLog.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 0 **** --- 1,39 ---- + package org.apache.nutch.extensions; + + import java.io.*; + + public class GLog + { + protected static GLog g; + protected BufferedWriter b; + protected Boolean locked; + + protected GLog() throws IOException + { + locked = false; + b = new BufferedWriter(new FileWriter("glog")); + log("LOG START\n\n"); + } + + public void finalize() throws IOException + { + b.close(); + } + + public void log(String s) throws IOException + { + String s2 = "------------------------------------------------\n" + s + "\n"; + b.write(s2,0,s2.length()); + b.flush(); + } + + public static GLog getGLog() + { + try { + if( GLog.g == null ) { + GLog.g = new GLog(); + } + return GLog.g; + } catch(IOException e) { return null; } + } + } \ No newline at end of file diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/fetcher/Fetcher.java gnutch/org/apache/nutch/fetcher/Fetcher.java *** nutch/src/java/org/apache/nutch/fetcher/Fetcher.java 2008-06-05 18:37:11.000000000 -0400 --- gnutch/org/apache/nutch/fetcher/Fetcher.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 42,47 **** --- 42,48 ---- import org.apache.nutch.scoring.ScoringFilters; import org.apache.nutch.util.*; + // import org.apache.nutch.extensions.GLog; /** The fetcher. Most of the work is done by plugins. */ public class Fetcher extends Configured implements Tool, MapRunnable { *************** *** 109,121 **** } public void run() { ! synchronized (Fetcher.this) {activeThreads++;} // count threads try { Text key = new Text(); CrawlDatum datum = new CrawlDatum(); ! while (true) { // TODO : NUTCH-258 ... // If something bad happened, then exit // if (conf.getBoolean("fetcher.exit", false)) { --- 110,124 ---- } public void run() { ! int n; ! synchronized (Fetcher.this) { n = activeThreads++;} // count threads try { Text key = new Text(); CrawlDatum datum = new CrawlDatum(); ! //GLog.log("Number of ActiveThreads: " + String.valueOf(n)); while (true) { + //GLog.log("Top of While loop for thread: " + String.valueOf(n)); // TODO : NUTCH-258 ... // If something bad happened, then exit // if (conf.getBoolean("fetcher.exit", false)) { *************** *** 151,157 **** try { if (LOG.isInfoEnabled()) { LOG.info("fetching " + url); } ! // fetch the page redirectCount = 0; do { --- 154,160 ---- try { if (LOG.isInfoEnabled()) { LOG.info("fetching " + url); } ! //GLog.log("Fetching: " + url.toString()); // fetch the page redirectCount = 0; do { *************** *** 171,176 **** --- 174,181 ---- new Text(reprUrl)); } + //GLog.log("status.getCode(): " + status.getCode().toString()); + switch(status.getCode()) { case ProtocolStatus.SUCCESS: // got a page diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java gnutch/org/apache/nutch/fetcher/FetcherOutputFormat.java *** nutch/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java 2008-06-05 18:37:11.000000000 -0400 --- gnutch/org/apache/nutch/fetcher/FetcherOutputFormat.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 42,55 **** import org.apache.nutch.protocol.Content; /** Splits FetcherOutput entries into multiple map files. */ ! public class FetcherOutputFormat implements OutputFormat { public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException { if (fs.exists(new Path(job.getOutputPath(), CrawlDatum.FETCH_DIR_NAME))) throw new IOException("Segment already fetched!"); } ! public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException { --- 42,55 ---- import org.apache.nutch.protocol.Content; /** Splits FetcherOutput entries into multiple map files. */ ! public class FetcherOutputFormat implements OutputFormat { public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException { if (fs.exists(new Path(job.getOutputPath(), CrawlDatum.FETCH_DIR_NAME))) throw new IOException("Segment already fetched!"); } ! public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException { *************** *** 65,73 **** new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class, compType, progress); ! return new RecordWriter() { private MapFile.Writer contentOut; ! private RecordWriter parseOut; { if (Fetcher.isStoringContent(job)) { --- 65,73 ---- new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class, compType, progress); ! return new RecordWriter() { private MapFile.Writer contentOut; ! private RecordWriter parseOut; { if (Fetcher.isStoringContent(job)) { diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/indexer/DeleteDuplicates.java gnutch/org/apache/nutch/indexer/DeleteDuplicates.java *** nutch/src/java/org/apache/nutch/indexer/DeleteDuplicates.java 2008-06-05 18:37:12.000000000 -0400 --- gnutch/org/apache/nutch/indexer/DeleteDuplicates.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 74,80 **** // partition by md5 // reduce, deleting all but with highest score (or shortest url). ! public static class IndexDoc implements WritableComparable { private Text url = new Text(); private int urlLen; private float score; --- 74,80 ---- // partition by md5 // reduce, deleting all but with highest score (or shortest url). ! public static class IndexDoc implements Cloneable, WritableComparable { private Text url = new Text(); private int urlLen; private float score; *************** *** 89,94 **** --- 89,117 ---- + ",hash=" + hash + ",index=" + index + ",doc=" + doc + ",keep=" + keep + "]"; } + + // Create a new shallow copy + protected Object clone() + { + try + { + return super.clone(); + } + catch(CloneNotSupportedException e) + { + return null; + } + } + + public IndexDoc copyConstructor() + { + IndexDoc i = (IndexDoc)clone(); + i.url = new Text(url); + i.hash = new MD5Hash(); + i.hash.set(hash); + i.index = new Text(index); + return i; + } public void write(DataOutput out) throws IOException { url.write(out); *************** *** 269,275 **** while (values.hasNext()) { IndexDoc value = values.next(); if (latest == null) { ! latest = value; continue; } if (value.time > latest.time) { --- 292,298 ---- while (values.hasNext()) { IndexDoc value = values.next(); if (latest == null) { ! latest = value.copyConstructor(); continue; } if (value.time > latest.time) { *************** *** 277,283 **** latest.keep = false; LOG.debug("-discard " + latest + ", keep " + value); output.collect(latest.hash, latest); ! latest = value; } else { // discard value.keep = false; --- 300,306 ---- latest.keep = false; LOG.debug("-discard " + latest + ", keep " + value); output.collect(latest.hash, latest); ! latest = value.copyConstructor(); } else { // discard value.keep = false; *************** *** 314,331 **** continue; } if (highest == null) { ! highest = value; continue; } IndexDoc toDelete = null, toKeep = null; boolean metric = byScore ? (value.score > highest.score) : (value.urlLen < highest.urlLen); if (metric) { ! toDelete = highest; ! toKeep = value; } else { ! toDelete = value; ! toKeep = highest; } if (LOG.isDebugEnabled()) { --- 337,354 ---- continue; } if (highest == null) { ! highest = value.copyConstructor(); continue; } IndexDoc toDelete = null, toKeep = null; boolean metric = byScore ? (value.score > highest.score) : (value.urlLen < highest.urlLen); if (metric) { ! toDelete = highest; ! toKeep = value; } else { ! toDelete = value; ! toKeep = highest; } if (LOG.isDebugEnabled()) { *************** *** 334,340 **** toDelete.keep = false; output.collect(toDelete.url, toDelete); ! highest = toKeep; } LOG.debug("-keep " + highest); // no need to add this - in phase 2 we only process docs to delete them --- 357,363 ---- toDelete.keep = false; output.collect(toDelete.url, toDelete); ! highest = toKeep.copyConstructor(); } LOG.debug("-keep " + highest); // no need to add this - in phase 2 we only process docs to delete them diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/indexer/FsDirectory.java gnutch/org/apache/nutch/indexer/FsDirectory.java *** nutch/src/java/org/apache/nutch/indexer/FsDirectory.java 2008-06-05 18:37:12.000000000 -0400 --- gnutch/org/apache/nutch/indexer/FsDirectory.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 54,73 **** throw new IOException(directory + " not a directory"); // clear old files ! Path[] files = fs.listPaths(directory, HadoopFSUtil.getPassAllFilter()); for (int i = 0; i < files.length; i++) { ! if (!fs.delete(files[i])) throw new IOException("Cannot delete " + files[i]); } } public String[] list() throws IOException { ! Path[] files = fs.listPaths(directory, HadoopFSUtil.getPassAllFilter()); if (files == null) return null; String[] result = new String[files.length]; for (int i = 0; i < files.length; i++) { ! result[i] = files[i].getName(); } return result; } --- 54,73 ---- throw new IOException(directory + " not a directory"); // clear old files ! FileStatus[] files = fs.listStatus(directory, HadoopFSUtil.getPassAllFilter()); for (int i = 0; i < files.length; i++) { ! if (!fs.delete(files[i].getPath(),false)) throw new IOException("Cannot delete " + files[i]); } } public String[] list() throws IOException { ! FileStatus[] files = fs.listStatus(directory, HadoopFSUtil.getPassAllFilter()); if (files == null) return null; String[] result = new String[files.length]; for (int i = 0; i < files.length; i++) { ! result[i] = files[i].getPath().getName(); } return result; } diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/indexer/IndexMerger.java gnutch/org/apache/nutch/indexer/IndexMerger.java *** nutch/src/java/org/apache/nutch/indexer/IndexMerger.java 2008-06-05 18:37:12.000000000 -0400 --- gnutch/org/apache/nutch/indexer/IndexMerger.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 135,141 **** Path outputIndex = new Path(args[i++]); for (; i < args.length; i++) { ! indexDirs.addAll(Arrays.asList(fs.listPaths(new Path(args[i]), HadoopFSUtil.getPassAllFilter()))); } // --- 135,145 ---- Path outputIndex = new Path(args[i++]); for (; i < args.length; i++) { ! FileStatus[] fStati = fs.listStatus(new Path(args[i]), HadoopFSUtil.getPassAllFilter()); ! for(int j = 0;j < fStati.length;j++) ! { ! indexDirs.add(fStati[j].getPath()); ! } } // diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/parse/ParseOutputFormat.java gnutch/org/apache/nutch/parse/ParseOutputFormat.java *** nutch/src/java/org/apache/nutch/parse/ParseOutputFormat.java 2008-06-05 18:37:14.000000000 -0400 --- gnutch/org/apache/nutch/parse/ParseOutputFormat.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 44,50 **** import org.apache.hadoop.util.Progressable; /* Parse content in a segment. */ ! public class ParseOutputFormat implements OutputFormat { private static final Log LOG = LogFactory.getLog(ParseOutputFormat.class); private URLFilters filters; --- 44,50 ---- import org.apache.hadoop.util.Progressable; /* Parse content in a segment. */ ! public class ParseOutputFormat implements OutputFormat { private static final Log LOG = LogFactory.getLog(ParseOutputFormat.class); private URLFilters filters; *************** *** 79,85 **** throw new IOException("Segment already parsed!"); } ! public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress) throws IOException { this.filters = new URLFilters(job); --- 79,85 ---- throw new IOException("Segment already parsed!"); } ! public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress) throws IOException { this.filters = new URLFilters(job); *************** *** 111,117 **** SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class, compType, progress); ! return new RecordWriter() { public void write(WritableComparable key, Writable value) --- 111,117 ---- SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class, compType, progress); ! return new RecordWriter() { public void write(WritableComparable key, Writable value) diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/searcher/FetchedSegments.java gnutch/org/apache/nutch/searcher/FetchedSegments.java *** nutch/src/java/org/apache/nutch/searcher/FetchedSegments.java 2008-06-05 18:37:12.000000000 -0400 --- gnutch/org/apache/nutch/searcher/FetchedSegments.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 116,127 **** /** Construct given a directory containing fetcher output. */ public FetchedSegments(FileSystem fs, String segmentsDir, Configuration conf) throws IOException { ! Path[] segmentDirs = fs.listPaths(new Path(segmentsDir)); this.summarizer = new SummarizerFactory(conf).getSummarizer(); if (segmentDirs != null) { for (int i = 0; i < segmentDirs.length; i++) { ! Path segmentDir = segmentDirs[i]; // Path indexdone = new Path(segmentDir, IndexSegment.DONE_NAME); // if (fs.exists(indexdone) && fs.isFile(indexdone)) { // segments.put(segmentDir.getName(), new Segment(fs, segmentDir)); --- 116,127 ---- /** Construct given a directory containing fetcher output. */ public FetchedSegments(FileSystem fs, String segmentsDir, Configuration conf) throws IOException { ! FileStatus[] segmentDirs = fs.listStatus(new Path(segmentsDir)); this.summarizer = new SummarizerFactory(conf).getSummarizer(); if (segmentDirs != null) { for (int i = 0; i < segmentDirs.length; i++) { ! Path segmentDir = segmentDirs[i].getPath(); // Path indexdone = new Path(segmentDir, IndexSegment.DONE_NAME); // if (fs.exists(indexdone) && fs.isFile(indexdone)) { // segments.put(segmentDir.getName(), new Segment(fs, segmentDir)); diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/searcher/NutchBean.java gnutch/org/apache/nutch/searcher/NutchBean.java *** nutch/src/java/org/apache/nutch/searcher/NutchBean.java 2008-06-05 18:37:12.000000000 -0400 --- gnutch/org/apache/nutch/searcher/NutchBean.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 122,137 **** } Vector vDirs=new Vector(); ! Path [] directories = fs.listPaths(indexesDir, HadoopFSUtil.getPassDirectoriesFilter(fs)); ! for(int i = 0; i < directories.length; i++) { ! Path indexdone = new Path(directories[i], Indexer.DONE_NAME); if(fs.isFile(indexdone)) { ! vDirs.add(directories[i]); } } ! directories = new Path[ vDirs.size() ]; for(int i = 0; vDirs.size()>0; i++) { directories[i]=(Path)vDirs.remove(0); } --- 122,137 ---- } Vector vDirs=new Vector(); ! FileStatus[] fStati = fs.listStatus(indexesDir, HadoopFSUtil.getPassDirectoriesFilter(fs)); ! for(int i = 0; i < fStati.length; i++) { ! Path indexdone = new Path(fStati[i].getPath(), Indexer.DONE_NAME); if(fs.isFile(indexdone)) { ! vDirs.add(fStati[i].getPath()); } } ! Path[] directories = new Path[ vDirs.size() ]; for(int i = 0; vDirs.size()>0; i++) { directories[i]=(Path)vDirs.remove(0); } diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/segment/SegmentMerger.java gnutch/org/apache/nutch/segment/SegmentMerger.java *** nutch/src/java/org/apache/nutch/segment/SegmentMerger.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/segment/SegmentMerger.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 28,33 **** --- 28,34 ---- import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; *************** *** 110,116 **** * * @author Andrzej Bialecki */ ! public class SegmentMerger extends Configured implements Mapper, Reducer { private static final Log LOG = LogFactory.getLog(SegmentMerger.class); private static final String SEGMENT_PART_KEY = "part"; --- 111,117 ---- * * @author Andrzej Bialecki */ ! public class SegmentMerger extends Configured implements Mapper, Reducer { private static final Log LOG = LogFactory.getLog(SegmentMerger.class); private static final String SEGMENT_PART_KEY = "part"; *************** *** 178,185 **** private static final String DEFAULT_SLICE = "default"; @Override ! public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException { ! return new RecordWriter() { MapFile.Writer c_out = null; MapFile.Writer f_out = null; MapFile.Writer pd_out = null; --- 179,186 ---- private static final String DEFAULT_SLICE = "default"; @Override ! public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException { ! return new RecordWriter() { MapFile.Writer c_out = null; MapFile.Writer f_out = null; MapFile.Writer pd_out = null; *************** *** 312,318 **** private Text newKey = new Text(); ! public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { String url = key.toString(); if (normalizers != null) { try { --- 313,319 ---- private Text newKey = new Text(); ! public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { String url = key.toString(); if (normalizers != null) { try { *************** *** 342,348 **** * important that segments be named in an increasing lexicographic order as * their creation time increases. */ ! public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { CrawlDatum lastG = null; CrawlDatum lastF = null; CrawlDatum lastSig = null; --- 343,349 ---- * important that segments be named in an increasing lexicographic order as * their creation time increases. */ ! public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { CrawlDatum lastG = null; CrawlDatum lastF = null; CrawlDatum lastSig = null; *************** *** 358,364 **** TreeMap> linked = new TreeMap>(); while (values.hasNext()) { ! MetaWrapper wrapper = (MetaWrapper)values.next(); Object o = wrapper.get(); String spString = wrapper.getMeta(SEGMENT_PART_KEY); if (spString == null) { --- 359,365 ---- TreeMap> linked = new TreeMap>(); while (values.hasNext()) { ! MetaWrapper wrapper = values.next(); Object o = wrapper.get(); String spString = wrapper.getMeta(SEGMENT_PART_KEY); if (spString == null) { *************** *** 626,634 **** boolean normalize = false; for (int i = 1; i < args.length; i++) { if (args[i].equals("-dir")) { ! Path[] files = fs.listPaths(new Path(args[++i]), HadoopFSUtil.getPassDirectoriesFilter(fs)); for (int j = 0; j < files.length; j++) ! segs.add(files[j]); } else if (args[i].equals("-filter")) { filter = true; } else if (args[i].equals("-normalize")) { --- 627,635 ---- boolean normalize = false; for (int i = 1; i < args.length; i++) { if (args[i].equals("-dir")) { ! FileStatus[] files = fs.listStatus(new Path(args[++i]), HadoopFSUtil.getPassDirectoriesFilter(fs)); for (int j = 0; j < files.length; j++) ! segs.add(files[j].getPath()); } else if (args[i].equals("-filter")) { filter = true; } else if (args[i].equals("-normalize")) { diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/segment/SegmentReader.java gnutch/org/apache/nutch/segment/SegmentReader.java *** nutch/src/java/org/apache/nutch/segment/SegmentReader.java 2008-06-05 18:37:13.000000000 -0400 --- gnutch/org/apache/nutch/segment/SegmentReader.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 40,45 **** --- 40,46 ---- import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; *************** *** 69,75 **** import org.apache.nutch.util.NutchJob; /** Dump the content of a segment. */ ! public class SegmentReader extends Configured implements Reducer { public static final Log LOG = LogFactory.getLog(SegmentReader.class); --- 70,76 ---- import org.apache.nutch.util.NutchJob; /** Dump the content of a segment. */ ! public class SegmentReader extends Configured implements Reducer { public static final Log LOG = LogFactory.getLog(SegmentReader.class); *************** *** 78,87 **** private boolean co, fe, ge, pa, pd, pt; private FileSystem fs; ! public static class InputCompatMapper extends MapReduceBase implements Mapper { private Text newKey = new Text(); ! public void map(WritableComparable key, Writable value, OutputCollector collector, Reporter reporter) throws IOException { // convert on the fly from old formats with UTF8 keys if (key instanceof UTF8) { newKey.set(key.toString()); --- 79,88 ---- private boolean co, fe, ge, pa, pd, pt; private FileSystem fs; ! public static class InputCompatMapper extends MapReduceBase implements Mapper { private Text newKey = new Text(); ! public void map(WritableComparable key, Writable value, OutputCollector collector, Reporter reporter) throws IOException { // convert on the fly from old formats with UTF8 keys if (key instanceof UTF8) { newKey.set(key.toString()); *************** *** 102,108 **** if (fs.exists(segmentDumpFile)) fs.delete(segmentDumpFile); final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile)); ! return new RecordWriter() { public synchronized void write(WritableComparable key, Writable value) throws IOException { printStream.println(value); } --- 103,109 ---- if (fs.exists(segmentDumpFile)) fs.delete(segmentDumpFile); final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile)); ! return new RecordWriter() { public synchronized void write(WritableComparable key, Writable value) throws IOException { printStream.println(value); } *************** *** 162,175 **** public void close() {} ! public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { StringBuffer dump = new StringBuffer(); dump.append("\nRecno:: ").append(recNo++).append("\n"); dump.append("URL:: " + key.toString() + "\n"); while (values.hasNext()) { ! Writable value = ((NutchWritable) values.next()).get(); // unwrap if (value instanceof CrawlDatum) { dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString()); } else if (value instanceof Content) { --- 163,176 ---- public void close() {} ! public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { StringBuffer dump = new StringBuffer(); dump.append("\nRecno:: ").append(recNo++).append("\n"); dump.append("URL:: " + key.toString() + "\n"); while (values.hasNext()) { ! Writable value = values.next().get(); // unwrap if (value instanceof CrawlDatum) { dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString()); } else if (value instanceof Content) { *************** *** 220,226 **** // remove the old file fs.delete(dumpFile); ! Path[] files = fs.listPaths(tempDir, HadoopFSUtil.getPassAllFilter()); PrintWriter writer = null; int currentRecordNumber = 0; --- 221,227 ---- // remove the old file fs.delete(dumpFile); ! FileStatus[] files = fs.listStatus(tempDir, HadoopFSUtil.getPassAllFilter()); PrintWriter writer = null; int currentRecordNumber = 0; *************** *** 228,234 **** writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(fs.create(dumpFile)))); try { for (int i = 0; i < files.length; i++) { ! Path partFile = (Path) files[i]; try { currentRecordNumber = append(fs, job, partFile, writer, currentRecordNumber); } catch (IOException exception) { --- 229,235 ---- writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(fs.create(dumpFile)))); try { for (int i = 0; i < files.length; i++) { ! Path partFile = files[i].getPath(); try { currentRecordNumber = append(fs, job, partFile, writer, currentRecordNumber); } catch (IOException exception) { *************** *** 559,567 **** if (args[i] == null) continue; if (args[i].equals("-dir")) { Path dir = new Path(args[++i]); ! Path[] files = fs.listPaths(dir, HadoopFSUtil.getPassDirectoriesFilter(fs)); if (files != null && files.length > 0) { ! dirs.addAll(Arrays.asList(files)); } } else dirs.add(new Path(args[i])); } --- 560,570 ---- if (args[i] == null) continue; if (args[i].equals("-dir")) { Path dir = new Path(args[++i]); ! FileStatus[] files = fs.listStatus(dir, HadoopFSUtil.getPassDirectoriesFilter(fs)); if (files != null && files.length > 0) { ! for(int j = 0;j < files.length;j++) { ! dirs.add(files[j].getPath()); ! } } } else dirs.add(new Path(args[i])); } diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/tools/arc/ArcRecordReader.java gnutch/org/apache/nutch/tools/arc/ArcRecordReader.java *** nutch/src/java/org/apache/nutch/tools/arc/ArcRecordReader.java 2008-06-05 18:37:12.000000000 -0400 --- gnutch/org/apache/nutch/tools/arc/ArcRecordReader.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 49,56 **** * @see http://www.archive.org/ * @see http://www.grub.org/ */ ! public class ArcRecordReader ! implements RecordReader { public static final Log LOG = LogFactory.getLog(ArcRecordReader.class); --- 49,55 ---- * @see http://www.archive.org/ * @see http://www.grub.org/ */ ! public class ArcRecordReader implements RecordReader { public static final Log LOG = LogFactory.getLog(ArcRecordReader.class); diff -crN --exclude=.svn nutch/src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java gnutch/org/apache/nutch/tools/compat/CrawlDbConverter.java *** nutch/src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java 2008-06-05 18:37:12.000000000 -0400 --- gnutch/org/apache/nutch/tools/compat/CrawlDbConverter.java 2008-06-05 17:13:24.000000000 -0400 *************** *** 37,43 **** import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.util.StringUtils; ! import org.apache.hadoop.util.ToolBase; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.CrawlDb; import org.apache.nutch.crawl.MapWritable; --- 37,44 ---- import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.util.StringUtils; ! import org.apache.hadoop.util.Tool; ! import org.apache.hadoop.conf.Configured; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.CrawlDb; import org.apache.nutch.crawl.MapWritable; *************** *** 52,58 **** * * @author Andrzej Bialecki */ ! public class CrawlDbConverter extends ToolBase implements Mapper { private static final Log LOG = LogFactory.getLog(CrawlDbConverter.class); private static final String CONVERT_META_KEY = "db.converter.with.metadata"; --- 53,59 ---- * * @author Andrzej Bialecki */ ! public class CrawlDbConverter extends Configured implements Tool, Mapper { private static final Log LOG = LogFactory.getLog(CrawlDbConverter.class); private static final String CONVERT_META_KEY = "db.converter.with.metadata"; *************** *** 66,72 **** newKey = new Text(); } ! public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { newKey.set(key.toString()); if (withMetadata) { --- 67,73 ---- newKey = new Text(); } ! public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { newKey.set(key.toString()); if (withMetadata) { *************** *** 97,103 **** * @param args */ public static void main(String[] args) throws Exception { ! int res = new CrawlDbConverter().doMain(NutchConfiguration.create(), args); } public int run(String[] args) throws Exception { --- 98,104 ---- * @param args */ public static void main(String[] args) throws Exception { ! int res = new CrawlDbConverter().run(args); } public int run(String[] args) throws Exception {