up 0 down

Таким образом, у меня есть два класса, ответственных за высев (потребители инъекционных Urls) и ползать.

ESSeedInjector Класс:

public class ESSeedInjector extends ConfigurableTopology {

public static void main(String[] args)  {
    ConfigurableTopology.start(new ESSeedInjector(), new String[]{".","seeds.txt","-local","-conf", "es-conf.yaml","--sleep","5000"});
}

@Override
public int run(String[] args) {

    if (args.length == 0) {
        System.err.println("ESSeedInjector seed_dir file_filter");
        return -1;
    }

    conf.setDebug(true);

    TopologyBuilder builder = new TopologyBuilder();

    Scheme scheme = new StringTabScheme(Status.DISCOVERED);

    builder.setSpout("spout", new FileSpout(args[0], args[1], scheme));

    Fields key = new Fields("url");

    builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("spout",
            key);

    builder.setBolt("enqueue", new StatusUpdaterBolt(), 10)
            .customGrouping("filter", new URLStreamGrouping());

    return submit("ESSeedInjector", conf, builder);
}

Гусеничный Класс:

public class ESCrawlTopology extends ConfigurableTopology {
public static void main(String[] args) {
    ConfigurableTopology.start(new ESCrawlTopology(), new String[]{"-conf", "es-conf.yaml", "-local"});
}

@Override
protected int run(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();

    int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1);

    int numShards = 1;

    builder.setSpout("spout", new CollapsingSpout(), numShards);

    builder.setBolt("status_metrics", new StatusMetricsBolt())
            .shuffleGrouping("spout");

    builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers)
            .shuffleGrouping("spout");

    builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping(
            "partitioner", new Fields("key"));

    builder.setBolt("sitemap", new SiteMapParserBolt(), numWorkers)
            .localOrShuffleGrouping("fetch");

    builder.setBolt("parse", new JSoupParserBolt(), numWorkers)
            .localOrShuffleGrouping("sitemap");

    builder.setBolt("indexer", new IndexerBolt(), numWorkers)
            .localOrShuffleGrouping("parse");

    Fields furl = new Fields("url");

    builder.setBolt("status", new StatusUpdaterBolt(), numWorkers)
            .fieldsGrouping("fetch", Constants.StatusStreamName, furl)
            .fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
            .fieldsGrouping("parse", Constants.StatusStreamName, furl)
            .fieldsGrouping("indexer", Constants.StatusStreamName, furl);

    builder.setBolt("deleter", new DeletionBolt(), numWorkers)
            .localOrShuffleGrouping("status",
                    Constants.DELETION_STREAM_NAME);

    conf.registerMetricsConsumer(MetricsConsumer.class);
    conf.registerMetricsConsumer(LoggingMetricsConsumer.class);

    return submit("crawl", conf, builder);
}
}

Поток идет -

Класс Run ESSeedInjector (Это успешно внедряет URLs).

Run Гусеничный класса.

Теперь это начало ползания, но в произвольный момент времени это приведет к ошибке.

18892 [elasticsearch[_client_][listener][T#2]] ERROR c.d.s.e.p.CollapsingSpout -  Exception with ES query
org.elasticsearch.transport.RemoteTransportException: [2rbuRko][127.0.0.1:9300][indices:data/read/search]
Caused by: org.elasticsearch.transport.RemoteTransportException: [2rbuRko][127.0.0.1:9300][indices:data/read/msearch]
Caused by: java.lang.IllegalArgumentException: Validation Failed: 1: no requests added;
    at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:29) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.search.MultiSearchRequest.validate(MultiSearchRequest.java:90) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:131) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:64) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:54) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:69) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.sendLocalRequest(TransportService.java:621) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.access$000(TransportService.java:73) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService$3.sendRequest(TransportService.java:133) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:569) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:502) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.sendChildRequest(TransportService.java:529) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.transport.TransportService.sendChildRequest(TransportService.java:520) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.search.SearchTransportService.sendExecuteMultiSearch(SearchTransportService.java:182) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.search.ExpandSearchPhase.run(ExpandSearchPhase.java:93) ~[?:?]
    at org.elasticsearch.action.search.AbstractSearchAsyncAction.executePhase(AbstractSearchAsyncAction.java:144) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:138) ~[elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.action.search.FetchSearchPhase.moveToNextPhase(FetchSearchPhase.java:207) ~[?:?]
    at org.elasticsearch.action.search.FetchSearchPhase.lambda$innerRun$2(FetchSearchPhase.java:105) ~[?:?]
    at org.elasticsearch.action.search.FetchSearchPhase.innerRun(FetchSearchPhase.java:117) ~[?:?]
    at org.elasticsearch.action.search.FetchSearchPhase.access$000(FetchSearchPhase.java:45) ~[?:?]
    at org.elasticsearch.action.search.FetchSearchPhase$1.doRun(FetchSearchPhase.java:87) ~[?:?]
    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:638) [elasticsearch-5.3.0.jar:5.3.0]
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-5.3.0.jar:5.3.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_151]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_151]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]

Не уверен, что вызывает ошибку, но модели, которые я видел, что если стереть данные из ElasticSearch, запустив ESIndex.Init, а затем выполнить ESSeedInjector затем класс ESCrawlTopology, он будет производить исключение очень рано в процессе ползания (После разбора семя гиперссылка).

Однако, если я снова запустить ESCrawlTopology (не делая ничего другого), он будет производить исключение, но в гораздо более позднее время.

EDIT: Когда я изменяю от CollapsingSpout() до AggregationSpout() я теперь получить этот журнал.

15409 [elasticsearch[_client_][listener][T#1]] INFO  c.d.s.e.p.AggregationSpout -  ES query returned 0 hits from 0 buckets in 2 msec with 0 already being processed

Ничто не обрабатывается или индексируется больше в ES.