[jira] [Commented] (NUTCH-2631) KafkaIndexWriter

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Commented] (NUTCH-2631) KafkaIndexWriter

JIRA jira@apache.org

    [ https://issues.apache.org/jira/browse/NUTCH-2631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16571997#comment-16571997 ]

ASF GitHub Bot commented on NUTCH-2631:
---------------------------------------

AyalCiobotaru closed pull request #372: NUTCH-2631 KafkaIndexWriter
URL: https://github.com/apache/nutch/pull/372
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.xml b/build.xml
index 56be49533..70f97d075 100644
--- a/build.xml
+++ b/build.xml
@@ -189,6 +189,7 @@
       <packageset dir="${plugins.dir}/indexer-dummy/src/java"/>
       <packageset dir="${plugins.dir}/indexer-elastic-rest/src/java/"/>
       <packageset dir="${plugins.dir}/indexer-elastic/src/java/" />
+      <packageset dir="${plugins.dir}/indexer-kafka/src/java/" />
       <packageset dir="${plugins.dir}/indexer-rabbit/src/java"/>
       <packageset dir="${plugins.dir}/indexer-solr/src/java"/>
       <packageset dir="${plugins.dir}/language-identifier/src/java"/>
@@ -641,6 +642,7 @@
       <packageset dir="${plugins.dir}/indexer-dummy/src/java"/>
       <packageset dir="${plugins.dir}/indexer-elastic-rest/src/java/"/>
       <packageset dir="${plugins.dir}/indexer-elastic/src/java/" />
+      <packageset dir="${plugins.dir}/indexer-kafka/src/java/" />
       <packageset dir="${plugins.dir}/indexer-rabbit/src/java"/>
       <packageset dir="${plugins.dir}/indexer-solr/src/java"/>
       <packageset dir="${plugins.dir}/language-identifier/src/java"/>
@@ -1058,6 +1060,7 @@
         <source path="${plugins.dir}/indexer-elastic-rest/src/java/"/>
         <source path="${plugins.dir}/indexer-elastic/src/java/" />
         <source path="${plugins.dir}/indexer-elastic/src/test/" />
+        <source path="${plugins.dir}/indexer-kafka/src/java/" />
         <source path="${plugins.dir}/indexer-rabbit/src/java/" />
         <source path="${plugins.dir}/indexer-solr/src/java/" />
         <source path="${plugins.dir}/language-identifier/src/java/" />
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index 797e3485a..7b23f6b21 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -1955,6 +1955,26 @@ visit https://wiki.apache.org/nutch/SimilarityScoringFilter-->
   </description>
 </property>
 
+<!-- Kafka properties -->
+
+<property>
+  <name>kafka.host</name>
+  <value></value>
+  <description>Location of the host Kafka cluster to connect to using producerConfig</description>
+</property>
+
+<property>
+  <name>kafka.port</name>
+  <value></value>
+  <description>The port to connect to using the producerConfig</description>
+</property>
+
+<property>
+  <name>kafka.index</name>
+  <value></value>
+  <description>Default index to attach to documents</description>
+</property>
+
 <!-- Elasticsearch properties -->
 
 <property>
diff --git a/ivy/ivy.xml b/ivy/ivy.xml
index 2dbe58351..70995c41b 100644
--- a/ivy/ivy.xml
+++ b/ivy/ivy.xml
@@ -136,6 +136,10 @@
  <!-- RabbitMQ dependencies -->
  <dependency org="com.rabbitmq" name="amqp-client" rev="3.6.5" conf="*->default" />
 
+ <!-- Kafka Dependencies -->
+ <dependency org="org.apache.kafka" name="kafka_2.12" rev="1.1.0"/>
+ <dependency org="org.apache.kafka" name="connect-json" rev="1.1.0"/>
+
 
  <!--Added Because of Elasticsearch JEST client-->
  <!--TODO refactor these to indexer-elastic-rest plugin somehow, currently doesn't resolve correctly-->
diff --git a/src/plugin/build.xml b/src/plugin/build.xml
index 5052082cd..385f50393 100755
--- a/src/plugin/build.xml
+++ b/src/plugin/build.xml
@@ -50,6 +50,7 @@
     <ant dir="indexer-dummy" target="deploy"/>
     <ant dir="indexer-elastic" target="deploy"/>
     <ant dir="indexer-elastic-rest" target="deploy"/>
+    <ant dir="indexer-kafka" target="deploy" />
     <ant dir="indexer-rabbit" target="deploy"/>
     <ant dir="indexer-solr" target="deploy"/>
     <ant dir="language-identifier" target="deploy"/>
@@ -170,6 +171,7 @@
     <ant dir="indexer-dummy" target="clean"/>
     <ant dir="indexer-elastic" target="clean"/>
     <ant dir="indexer-elastic-rest" target="clean"/>
+    <and dir="indexer-kafka" target="clean"/>
     <ant dir="indexer-rabbit" target="clean"/>
     <ant dir="indexer-solr" target="clean"/>
     <ant dir="language-identifier" target="clean"/>
diff --git a/src/plugin/indexer-kafka/build-ivy.xml b/src/plugin/indexer-kafka/build-ivy.xml
new file mode 100644
index 000000000..0932dfc8d
--- /dev/null
+++ b/src/plugin/indexer-kafka/build-ivy.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-kafka" default="deps-jar" xmlns:ivy="antlib:org.apache.ivy.ant">
+
+    <property name="ivy.install.version" value="2.1.0"/>
+    <condition property="ivy.home" value="${env.IVY_HOME}">
+        <isset property="env.IVY_HOME"/>
+    </condition>
+    <property name="ivy.home" value="${user.home}/.ant"/>
+    <property name="ivy.checksums" value=""/>
+    <property name="ivy.jar.dir" value="${ivy.home}/lib"/>
+    <property name="ivy.jar.file" value="${ivy.jar.dir}/ivy.jar"/>
+
+    <target name="download-ivy" unless="offline">
+
+        <mkdir dir="${ivy.jar.dir}"/>
+        <!-- download Ivy from web site so that it can be used even without any special installation -->
+        <get src="http://repo2.maven.org/maven2/org/apache/ivy/ivy/${ivy.install.version}/ivy-${ivy.install.version}.jar"
+             dest="${ivy.jar.file}" usetimestamp="true"/>
+    </target>
+
+    <target name="init-ivy" depends="download-ivy">
+        <!-- try to load ivy here from ivy home, in case the user has not already dropped
+                it into ant's lib dir (note that the latter copy will always take precedence).
+                We will not fail as long as local lib dir exists (it may be empty) and
+                ivy is in at least one of ant's lib dir or the local lib dir. -->
+        <path id="ivy.lib.path">
+            <fileset dir="${ivy.jar.dir}" includes="*.jar"/>
+
+        </path>
+        <taskdef resource="org/apache/ivy/ant/antlib.xml"
+                 uri="antlib:org.apache.ivy.ant" classpathref="ivy.lib.path"/>
+    </target>
+
+    <target name="deps-jar" depends="init-ivy">
+        <ivy:retrieve pattern="lib/[artifact]-[revision].[ext]"/>
+    </target>
+
+</project>
diff --git a/src/plugin/indexer-kafka/build.xml b/src/plugin/indexer-kafka/build.xml
new file mode 100644
index 000000000..c2f8078e9
--- /dev/null
+++ b/src/plugin/indexer-kafka/build.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-kafka" default="jar-core">
+
+    <import file="../build-plugin.xml"/>
+
+</project>
diff --git a/src/plugin/indexer-kafka/ivy.xml b/src/plugin/indexer-kafka/ivy.xml
new file mode 100644
index 000000000..f834fd22c
--- /dev/null
+++ b/src/plugin/indexer-kafka/ivy.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<ivy-module version="1.0">
+    <info organisation="org.apache.nutch" module="${ant.project.name}">
+        <license name="Apache 2.0"/>
+        <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/>
+        <description>
+            Apache Nutch
+        </description>
+    </info>
+
+    <configurations>
+        <include file="../../..//ivy/ivy-configurations.xml"/>
+    </configurations>
+
+    <publications>
+        <!--get the artifact from our module name-->
+        <artifact conf="master"/>
+    </publications>
+
+    <dependencies>
+        <!-- https://mvnrepository.com/artifact/io.searchbox/jest -->
+        <dependency org="io.searchbox" name="jest" rev="2.0.3" conf="*->default"/>
+    </dependencies>
+
+</ivy-module>
diff --git a/src/plugin/indexer-kafka/plugin.xml b/src/plugin/indexer-kafka/plugin.xml
new file mode 100644
index 000000000..34ea1b33c
--- /dev/null
+++ b/src/plugin/indexer-kafka/plugin.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<plugin id="indexer-kafka" name="KafkaIndexWriter" version="1.0.0"
+        provider-name="nutch.apache.org">
+
+    <runtime>
+        <library name="indexer-kafka.jar">
+            <export name="*"/>
+        </library>
+
+        <library name="commons-codec-1.9.jar"/>
+        <library name="commons-lang3-3.4.jar"/>
+        <library name="commons-logging-1.2.jar"/>
+        <library name="gson-2.6.2.jar"/>
+        <library name="guava-19.0.jar"/>
+        <library name="httpasyncclient-4.1.1.jar"/>
+        <library name="httpclient-4.5.2.jar"/>
+        <library name="httpcore-4.4.4.jar"/>
+        <library name="httpcore-nio-4.4.4.jar"/>
+        <library name="jest-2.0.3.jar"/>
+        <library name="jest-common-2.0.3.jar"/>
+
+    </runtime>
+
+    <requires>
+        <import plugin="nutch-extensionpoints"/>
+    </requires>
+
+    <extension id="org.apache.nutch.indexer.kafka"
+               name="Kafka Index Writer"
+               point="org.apache.nutch.indexer.IndexWriter">
+        <implementation id="KafkaIndexWriter"
+                        class="org.apache.nutch.indexwriter.kafka.KafkaIndexWriter"/>
+    </extension>
+
+</plugin>
diff --git a/src/plugin/indexer-kafka/src/java/org/apache/nutch/indexwriter/kafka/KafkaConstants.java b/src/plugin/indexer-kafka/src/java/org/apache/nutch/indexwriter/kafka/KafkaConstants.java
new file mode 100644
index 000000000..3b2af0543
--- /dev/null
+++ b/src/plugin/indexer-kafka/src/java/org/apache/nutch/indexwriter/kafka/KafkaConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.kafka;
+
+public interface KafkaConstants {
+  public static final String KAFKA_PREFIX = "kafka.";
+
+  public static final String HOST = KAFKA_PREFIX + "host";
+  public static final String PORT = KAFKA_PREFIX + "port";
+  public static final String INDEX = KAFKA_PREFIX + "index";
+
+  public static final String KEY_SERIALIZER = KAFKA_PREFIX + "key.serializer";
+  public static final String VALUE_SERIALIZER = KAFKA_PREFIX
+      + "value.serializer";
+  public static final String TOPIC = KAFKA_PREFIX + "topic";
+}
diff --git a/src/plugin/indexer-kafka/src/java/org/apache/nutch/indexwriter/kafka/KafkaIndexWriter.java b/src/plugin/indexer-kafka/src/java/org/apache/nutch/indexwriter/kafka/KafkaIndexWriter.java
new file mode 100644
index 000000000..446df9784
--- /dev/null
+++ b/src/plugin/indexer-kafka/src/java/org/apache/nutch/indexwriter/kafka/KafkaIndexWriter.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TODO refactor the dependencies out of root ivy file
+package org.apache.nutch.indexwriter.kafka;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.NutchDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Properties;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ */
+public class KafkaIndexWriter implements IndexWriter {
+  public static Logger LOG = LoggerFactory.getLogger(KafkaIndexWriter.class);
+
+  private org.apache.kafka.clients.producer.Producer<String, JsonNode> producer;
+  private ProducerRecord<String, JsonNode> data;
+
+  private Configuration config;
+
+  private int port = -1;
+  private String host = null;
+  private String valueSerializer = null;
+  private String keySerializer = null;
+  private String topic = null;
+
+  private String jsonString = null;
+  private JsonNode json = null;
+
+  private List<ProducerRecord<String, JsonNode>> inputDocs = new ArrayList<ProducerRecord<String, JsonNode>>(
+      10);
+
+  @Override
+  public void open(JobConf job, String name) throws IOException {
+
+    host = job.get(KafkaConstants.HOST);
+    port = job.getInt(KafkaConstants.PORT, 9092);
+
+    keySerializer = job.get(KafkaConstants.KEY_SERIALIZER,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+    valueSerializer = job.get(KafkaConstants.VALUE_SERIALIZER,
+        "org.apache.kafka.connect.json.JsonSerializer");
+    topic = job.get(KafkaConstants.TOPIC);
+
+    Properties configProperties = new Properties();
+    configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        host + ":" + port);
+    configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        keySerializer);
+    configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        valueSerializer);
+
+    producer = new KafkaProducer<String, JsonNode>(configProperties);
+  }
+
+  @Override
+  public void write(NutchDocument doc) throws IOException {
+
+    Map<String, Object> source = new HashMap<String, Object>();
+
+    // Loop through all fields of this doc
+    for (String fieldName : doc.getFieldNames()) {
+      Set<String> allFieldValues = new HashSet<String>();
+      for (Object value : doc.getField(fieldName).getValues()) {
+        allFieldValues.add(value.toString());
+      }
+      String[] fieldValues = allFieldValues
+          .toArray(new String[allFieldValues.size()]);
+      source.put(fieldName, fieldValues);
+    }
+    try {
+      jsonString = new ObjectMapper().writeValueAsString(source);
+      json = new ObjectMapper().readTree(jsonString);
+      data = new ProducerRecord<String, JsonNode>(topic, json);
+
+      inputDocs.add(data);
+    } catch (NullPointerException e) {
+      LOG.info("Data is empty, all messages have been sent");
+    }
+  }
+
+  @Override
+  public void delete(String key) throws IOException {
+    // Not applicable in Kafka
+  }
+
+  @Override
+  public void update(NutchDocument doc) throws IOException {
+    try {
+      write(doc);
+    } catch (IOException e) {
+      LOG.error(ExceptionUtils.getStackTrace(e));
+      throw e;
+    }
+  }
+
+  @Override
+  public void commit() throws IOException {
+    try {
+      for (ProducerRecord<String, JsonNode> datum : inputDocs) {
+        producer.send(datum);
+      }
+    } catch (NullPointerException e) {
+      LOG.info("All records have been sent to Kakfa on topic {}", topic);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    commit();
+  }
+
+  @Override
+  public String describe() {
+    StringBuffer sb = new StringBuffer("KafkaIndexWriter\n");
+    sb.append("\t").append(KafkaConstants.HOST).append(" : hostname \n");
+    sb.append("\t").append(KafkaConstants.PORT).append(" : port \n");
+    sb.append("\t").append(KafkaConstants.INDEX)
+        .append(" : Kafka index command \n");
+    return sb.toString();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    config = conf;
+    String host = conf.get(KafkaConstants.HOST);
+    String port = conf.get(KafkaConstants.PORT);
+
+    if (StringUtils.isBlank(host) && StringUtils.isBlank(port)) {
+      String message = "Missing kafka.host and kafka.port. These should be set in nutch-site.xml ";
+      message += "\n" + describe();
+      LOG.error(message);
+      throw new RuntimeException(message);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return config;
+  }
+
+}
diff --git a/src/plugin/indexer-kafka/src/java/org/apache/nutch/indexwriter/kafka/package-info.java b/src/plugin/indexer-kafka/src/java/org/apache/nutch/indexwriter/kafka/package-info.java
new file mode 100644
index 000000000..9ebb7ad27
--- /dev/null
+++ b/src/plugin/indexer-kafka/src/java/org/apache/nutch/indexwriter/kafka/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexwriter.kafka;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


> KafkaIndexWriter
> ----------------
>
>                 Key: NUTCH-2631
>                 URL: https://issues.apache.org/jira/browse/NUTCH-2631
>             Project: Nutch
>          Issue Type: Improvement
>          Components: indexer
>            Reporter: Ayal Ciobotaru
>            Priority: Minor
>              Labels: patch
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> There is no current way to index directly into Kafka in order to have a full message based system controlled by Kafka. Created a KafkaIndexWriter in order to produce the crawled documents into Kafka and have Kafka distribute the messages as necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)