diff --git a/Dockerfile.frontier b/Dockerfile.frontier index e0b09d4b8..8bafba5a0 100644 --- a/Dockerfile.frontier +++ b/Dockerfile.frontier @@ -3,7 +3,7 @@ FROM openjdk:8u151-jdk RUN apt-get update && apt-get install -y netcat COPY ./squirrel.frontier/target/squirrel.frontier.jar /data/squirrel/squirrel.jar -COPY ./spring-config/default-config.xml /data/squirrel/default-config.xml +COPY ./spring-config/ /data/squirrel/spring-config WORKDIR /data/squirrel #ADD entrypoint.sh /entrypoint.sh @@ -11,4 +11,4 @@ WORKDIR /data/squirrel VOLUME ["/var/squirrel/data"] -CMD java -cp squirrel.jar:. org.hobbit.core.run.ComponentStarter org.aksw.simba.squirrel.components.FrontierComponent +CMD java -cp squirrel.jar:. org.hobbit.core.run.ComponentStarter org.dice_research.squirrel.components.FrontierComponent diff --git a/Dockerfile.web b/Dockerfile.web new file mode 100644 index 000000000..3ceddfed8 --- /dev/null +++ b/Dockerfile.web @@ -0,0 +1,16 @@ +FROM openjdk:8u151-jdk + +RUN apt-get update && apt-get install -y netcat + +COPY ./squirrel.web/target/squirrel.web.jar /data/squirrel/squirrel.web.jar +COPY ./squirrel.web/target/squirrel.web.jar.original /data/squirrel/squirrel.web.jar.original +COPY ./squirrel.web/WEB-INF /data/squirrel/WEB-INF +WORKDIR /data/squirrel + +#ADD entrypoint.sh /entrypoint.sh +#RUN chmod +x /entrypoint.sh + +VOLUME ["/var/squirrel/data"] + +CMD java -cp squirrel.web.jar:. com.squirrel.Application + diff --git a/Dockerfile.worker b/Dockerfile.worker index e7c536e6b..2d405901b 100644 --- a/Dockerfile.worker +++ b/Dockerfile.worker @@ -3,7 +3,7 @@ FROM openjdk:8u151-jdk RUN apt-get update && apt-get install -y netcat COPY ./squirrel.worker/target/squirrel.worker.jar /data/squirrel/squirrel.jar -COPY ./spring-config/default-config.xml /data/squirrel/default-config.xml +COPY ./spring-config /data/squirrel/spring-config WORKDIR /data/squirrel #ADD entrypoint.sh /entrypoint.sh diff --git a/Makefile b/Makefile index 4fa1ea933..cd800ee53 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,13 @@ default: build build: - docker-compose -f docker-compose-sparql.yml down + docker-compose -f docker-compose.yml down mvn clean install -U -DskipTests -Dmaven.javadoc.skip=true dockerize: docker build -f Dockerfile.frontier -t squirrel.frontier . docker build -f Dockerfile.worker -t squirrel.worker . + docker build -f Dockerfile.web -t squirrel.web . start: dockerize docker-compose -f docker-compose-sparql.yml up diff --git a/build-squirrel b/build-squirrel new file mode 100755 index 000000000..ace055933 --- /dev/null +++ b/build-squirrel @@ -0,0 +1,15 @@ +#!/bin/bash +echo "Building Squirrel..." +cd squirrel.web-api +mvn clean install +cd .. +mvn clean install -DskipTests +clear +echo "Creating Frontier image..." +docker build -f Dockerfile.frontier -t squirrel.frontier . +echo "Creating Worker image..." +docker build -f Dockerfile.worker -t squirrel.worker . +echo "Creating Web image..." +docker build -f Dockerfile.web -t squirrel.web . +clear +echo "Finished" diff --git a/docker-compose-sparql.yml b/docker-compose-sparql.yml index 2bd39f1c4..3dde7d9a1 100644 --- a/docker-compose-sparql.yml +++ b/docker-compose-sparql.yml @@ -130,8 +130,8 @@ services: DEDUPLICATION_ACTIVE: "true" HOBBIT_RABBIT_HOST: rabbit OUTPUT_FOLDER: /var/squirrel/data - RDB_HOST_NAME: rethinkdb - RDB_PORT: 28015 + MDB_HOST_NAME: mongodb + MDB_PORT: 27017 SPARQL_HOST_NAME: sparqlhost SPARQL_HOST_PORT: 3030 SERVICE_PRECONDITION: "rethinkdb:28015 rabbit:5672" diff --git a/docker-compose-sparql-web.yml b/docker-compose-web.yml similarity index 85% rename from docker-compose-sparql-web.yml rename to docker-compose-web.yml index 8426312c2..0136bed32 100644 --- a/docker-compose-sparql-web.yml +++ b/docker-compose-web.yml @@ -19,14 +19,14 @@ services: # ports: frontier: - image: squirrel:latest + image: squirrel.frontier:latest container_name: frontier environment: - HOBBIT_RABBIT_HOST=rabbit - SEED_FILE=/var/squirrel/seeds.txt - URI_WHITELIST_FILE=/var/squirrel/whitelist.txt - - RDB_HOST_NAME=rethinkdb - - RDB_PORT=28015 + - MDB_HOST_NAME=mongodb + - MDB_PORT=27017 - COMMUNICATION_WITH_WEBSERVICE=true - VISUALIZATION_OF_CRAWLED_GRAPH=true volumes: @@ -36,10 +36,13 @@ services: command: java -cp squirrel.jar org.hobbit.core.run.ComponentStarter org.dice_research.squirrel.components.FrontierComponent web: - image: squirrel/webimage:latest + image: squirrel.web:latest container_name: web + environment: + - HOST=rabbit ports: - "8080:8080" + command: java -jar squirrel.web.jar sparqlhost: image: stain/jena-fuseki @@ -70,19 +73,19 @@ services: - "5672:5672" worker1: - image: squirrel:latest + image: squirrel.worker:latest container_name: worker1 environment: - HOBBIT_RABBIT_HOST=rabbit - OUTPUT_FOLDER=/var/squirrel/data - HTML_SCRAPER_YAML_PATH=/var/squirrel/yaml - - CONTEXT_CONFIG_FILE=/var/squirrel/spring-config/context-sparqlStoreBased.xml + - CONTEXT_CONFIG_FILE=/var/squirrel/spring-config/context.xml - SPARQL_HOST_NAME=sparqlhost #- CKAN_WHITELIST_FILE=/var/squirrel/ckanwhitelist.txt - SPARQL_HOST_PORT=3030 - DEDUPLICATION_ACTIVE=true - - RDB_HOST_NAME=rethinkdb - - RDB_PORT=28015 + - MDB_HOST_NAME=mongodb + - MDB_PORT=27017 #-CKAN_PORT= volumes: - ./data/worker1:/var/squirrel/data @@ -92,19 +95,19 @@ services: command: java -cp squirrel.jar org.dice_research.squirrel.components.WorkerComponentStarter worker2: - image: squirrel:latest + image: squirrel.worker:latest container_name: worker2 environment: - HOBBIT_RABBIT_HOST=rabbit - OUTPUT_FOLDER=/var/squirrel/data - HTML_SCRAPER_YAML_PATH=/var/squirrel/yaml - - CONTEXT_CONFIG_FILE=/var/squirrel/spring-config/context-sparqlStoreBased.xml + - CONTEXT_CONFIG_FILE=/var/squirrel/spring-config/context.xml - SPARQL_HOST_NAME=sparqlhost #- CKAN_WHITELIST_FILE=/var/squirrel/ckanwhitelist.txt - SPARQL_HOST_PORT=3030 - DEDUPLICATION_ACTIVE=true - - RDB_HOST_NAME=rethinkdb - - RDB_PORT=28015 + - MDB_HOST_NAME=mongodb + - MDB_PORT=27017 #-CKAN_PORT= volumes: - ./data/worker2:/var/squirrel/data @@ -114,19 +117,19 @@ services: command: java -cp squirrel.jar org.dice_research.squirrel.components.WorkerComponentStarter worker3: - image: squirrel:latest + image: squirrel.worker:latest container_name: worker3 environment: - HOBBIT_RABBIT_HOST=rabbit - OUTPUT_FOLDER=/var/squirrel/data - HTML_SCRAPER_YAML_PATH=/var/squirrel/yaml - - CONTEXT_CONFIG_FILE=/var/squirrel/spring-config/context-sparqlStoreBased.xml + - CONTEXT_CONFIG_FILE=/var/squirrel/spring-config/context.xml - SPARQL_HOST_NAME=sparqlhost #- CKAN_WHITELIST_FILE=/var/squirrel/ckanwhitelist.txt - SPARQL_HOST_PORT=3030 - DEDUPLICATION_ACTIVE=true - - RDB_HOST_NAME=rethinkdb - - RDB_PORT=28015 + - MDB_HOST_NAME=mongodb + - MDB_PORT=27017 #-CKAN_PORT= volumes: - ./data/worker3:/var/squirrel/data @@ -142,8 +145,8 @@ services: DEDUPLICATION_ACTIVE: "true" HOBBIT_RABBIT_HOST: rabbit OUTPUT_FOLDER: /var/squirrel/data - RDB_HOST_NAME: rethinkdb - RDB_PORT: 28015 + MDB_HOST_NAME: mongodb + MDB_PORT: 27017 SPARQL_HOST_NAME: sparqlhost SPARQL_HOST_PORT: 3030 SERVICE_PRECONDITION: "rethinkdb:28015 rabbit:5672" diff --git a/docker-compose.yml b/docker-compose.yml index b64e8a21e..dcd855232 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,19 +14,30 @@ services: container_name: frontier environment: - HOBBIT_RABBIT_HOST=rabbit - - SEED_FILE=/var/squirrel/seeds.txt - URI_WHITELIST_FILE=/var/squirrel/whitelist.txt - - RDB_HOST_NAME=rethinkdb - - RDB_PORT=28015 + - SEED_FILE=/var/squirrel/seeds.txt - MDB_HOST_NAME=mongodb - MDB_PORT=27017 - COMMUNICATION_WITH_WEBSERVICE=false - VISUALIZATION_OF_CRAWLED_GRAPH=false + - JVM_ARGS=-Xmx8g volumes: - ./data/frontier:/var/squirrel/data - ./seed/seeds.txt:/var/squirrel/seeds.txt:ro - - ./whitelist/ckanwhitelist.txt:/var/squirrel/whitelist.txt:ro -# command: java -cp squirrel.jar org.hobbit.core.run.ComponentStarter org.dice_research.squirrel.components.FrontierComponent + - ./whitelist/whitelist.txt:/var/squirrel/whitelist.txt:ro + command: java -cp squirrel.jar org.hobbit.core.run.ComponentStarter org.dice_research.squirrel.components.FrontierComponent + + sparqlhost: + image: stain/jena-fuseki + container_name: sparqlhost + ports: + - "3030:3030" + volumes: + - ./data/sparqlhost/sparqlhost_data:/fuseki + environment: + - ADMIN_PASSWORD=pw123 + - JVM_ARGS=-Xmx2g + mongodb: image: mongo:4.0.0 volumes: @@ -39,7 +50,8 @@ services: volumes: - ./data/rethinkdb:/data ports: - - "28015:28015" + - "8080:8080" + command: rethinkdb --bind all # message bus rabbit: @@ -58,15 +70,18 @@ services: - HOBBIT_RABBIT_HOST=rabbit - OUTPUT_FOLDER=/var/squirrel/data - HTML_SCRAPER_YAML_PATH=/var/squirrel/yaml - - CONTEXT_CONFIG_FILE=/var/squirrel/spring-config/context-fileBased.xml - - DEDUPLICATION_ACTIVE=true - - RDB_HOST_NAME=rethinkdb - - RDB_PORT=28015 + - CONTEXT_CONFIG_FILE=/var/squirrel/spring-config/context.xml + - SPARQL_HOST_NAME=sparqlhost + - SPARQL_HOST_PORT=3030 + - DEDUPLICATION_ACTIVE=false + - MDB_HOST_NAME=mongodb + - MDB_PORT=27017 + - JVM_ARGS=-Xmx8g volumes: - ./data/worker1:/var/squirrel/data - ./yaml:/var/squirrel/yaml - ./spring-config:/var/squirrel/spring-config -# command: java -cp squirrel.jar org.dice_research.squirrel.components.WorkerComponentStarter + command: java -cp squirrel.jar org.dice_research.squirrel.components.WorkerComponentStarter worker2: image: squirrel.worker:latest @@ -76,14 +91,17 @@ services: - OUTPUT_FOLDER=/var/squirrel/data - HTML_SCRAPER_YAML_PATH=/var/squirrel/yaml - CONTEXT_CONFIG_FILE=/var/squirrel/spring-config/context-fileBased.xml - - DEDUPLICATION_ACTIVE=true - - RDB_HOST_NAME=rethinkdb - - RDB_PORT=28015 + - SPARQL_HOST_NAME=sparqlhost + - SPARQL_HOST_PORT=3030 + - DEDUPLICATION_ACTIVE=false + - MDB_HOST_NAME=mongodb + - MDB_PORT=27017 + - JVM_ARGS=-Xmx8g volumes: - ./data/worker2:/var/squirrel/data - ./yaml:/var/squirrel/yaml - ./spring-config:/var/squirrel/spring-config -# command: java -cp squirrel.jar org.dice_research.squirrel.components.WorkerComponentStarter + command: java -cp squirrel.jar org.dice_research.squirrel.components.WorkerComponentStarter worker3: image: squirrel.worker:latest @@ -93,28 +111,30 @@ services: - OUTPUT_FOLDER=/var/squirrel/data - HTML_SCRAPER_YAML_PATH=/var/squirrel/yaml - CONTEXT_CONFIG_FILE=/var/squirrel/spring-config/context-fileBased.xml + - SPARQL_HOST_NAME=sparqlhost + - SPARQL_HOST_PORT=3030 - DEDUPLICATION_ACTIVE=true - - RDB_HOST_NAME=rethinkdb - - RDB_PORT=28015 + - MDB_HOST_NAME=mongodb + - MDB_PORT=27017 + - JVM_ARGS=-Xmx8g volumes: - ./data/worker3:/var/squirrel/data - ./yaml:/var/squirrel/yaml - ./spring-config:/var/squirrel/spring-config -# command: java -cp squirrel.jar org.dice_research.squirrel.components.WorkerComponentStarter - -# deduplicator: -# image: squirrel -# container_name: deduplicator -# environment: -# DEDUPLICATION_ACTIVE: "true" -# HOBBIT_RABBIT_HOST: rabbit -# OUTPUT_FOLDER: /var/squirrel/data -# RDB_HOST_NAME: rethinkdb -# RDB_PORT: 28015 -# SPARQL_HOST_NAME: sparqlhost -# SPARQL_HOST_PORT: 3030 -# SERVICE_PRECONDITION: "rethinkdb:28015 rabbit:5672" -# volumes: -# - ./data/deduplicator:/var/squirrel/data -# command: java -cp squirrel.jar org.hobbit.core.run.ComponentStarter org.dice_research.squirrel.components.DeduplicatorComponent + command: java -cp squirrel.jar org.dice_research.squirrel.components.WorkerComponentStarter + deduplicator: + image: squirrel + container_name: deduplicator + environment: + DEDUPLICATION_ACTIVE: "true" + HOBBIT_RABBIT_HOST: rabbit + OUTPUT_FOLDER: /var/squirrel/data + MDB_HOST_NAME: mongodb + MDB_PORT: 27017 + SPARQL_HOST_NAME: sparqlhost + SPARQL_HOST_PORT: 3030 + SERVICE_PRECONDITION: "rethinkdb:28015 rabbit:5672" + volumes: + - ./data/deduplicator:/var/squirrel/data + command: java -cp squirrel.jar org.hobbit.core.run.ComponentStarter org.aksw.simba.squirrel.components.DeduplicatorComponent diff --git a/pom.xml b/pom.xml index b8fbe1dc3..21ea11409 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.dice-research squirrel - 0.3.0-SNAPSHOT + 0.3.0 pom 2017 Squirrel @@ -33,12 +33,12 @@ + squirrel.web-api + squirrel.web squirrel.api squirrel.deduplication squirrel.frontier squirrel.mockup - squirrel.web - squirrel.web-api squirrel.worker diff --git a/seed/seeds.txt b/seed/seeds.txt index 86760d0aa..d2a69b00a 100644 --- a/seed/seeds.txt +++ b/seed/seeds.txt @@ -1,3 +1,3 @@ https://dbpedia.org/resource/New_York https://dbpedia.org/resource/Moscow -https://dbpedia.org/resource/Brazil \ No newline at end of file +https://dbpedia.org/resource/China diff --git a/spring-config/context-sparql.xml b/spring-config/context-sparql.xml index a6f209ed4..862b4702e 100644 --- a/spring-config/context-sparql.xml +++ b/spring-config/context-sparql.xml @@ -32,41 +32,68 @@ - - - - - - - - + + + + + + + + + - org.dice_research.squirrel.analyzer.impl.HDTAnalyzer org.dice_research.squirrel.analyzer.impl.RDFAnalyzer - org.dice_research.squirrel.analyzer.impl.MicrodataParser + org.dice_research.squirrel.analyzer.impl.HDTAnalyzer + org.dice_research.squirrel.analyzer.impl.html.scraper.HTMLScraperAnalyzer + org.dice_research.squirrel.analyzer.impl.RDFaSemarglParser + org.dice_research.squirrel.analyzer.impl.MicrodataParser org.dice_research.squirrel.analyzer.impl.MicroformatMF2JParser - org.dice_research.squirrel.analyzer.impl.RDFaSemarglParser - org.dice_research.squirrel.analyzer.impl.html.scraper.HTMLScraperAnalyzer + org.dice_research.squirrel.analyzer.impl.ckan.CkanJsonAnalyzer - - + + + + + + + + + + + + + + + + + + + + + + + + + + + - @@ -76,22 +103,23 @@ --> - + + value="europeandataportal/update"/> + value="europeandataportal/query"/> + diff --git a/spring-config/context.xml b/spring-config/context.xml index 1bc9bab4c..3ee740ebf 100644 --- a/spring-config/context.xml +++ b/spring-config/context.xml @@ -32,32 +32,59 @@ - - - - - - - - + + + + + + + + + - org.dice_research.squirrel.analyzer.impl.HDTAnalyzer org.dice_research.squirrel.analyzer.impl.RDFAnalyzer - org.dice_research.squirrel.analyzer.impl.MicrodataParser + org.dice_research.squirrel.analyzer.impl.HDTAnalyzer + org.dice_research.squirrel.analyzer.impl.html.scraper.HTMLScraperAnalyzer + org.dice_research.squirrel.analyzer.impl.RDFaSemarglParser + org.dice_research.squirrel.analyzer.impl.MicrodataParser org.dice_research.squirrel.analyzer.impl.MicroformatMF2JParser - org.dice_research.squirrel.analyzer.impl.RDFaSemarglParser - org.dice_research.squirrel.analyzer.impl.html.scraper.HTMLScraperAnalyzer + org.dice_research.squirrel.analyzer.impl.ckan.CkanJsonAnalyzer - - + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -66,7 +93,7 @@ - + @@ -83,9 +110,9 @@ + value="europeandataportal/update"/> + value="europeandataportal/query"/> - - - - - - - - - - - - diff --git a/squirrel.api/pom.xml b/squirrel.api/pom.xml index 8e16a6ad6..2f80bd665 100644 --- a/squirrel.api/pom.xml +++ b/squirrel.api/pom.xml @@ -5,7 +5,7 @@ org.dice-research squirrel - 0.3.0-SNAPSHOT + 0.3.0 squirrel.api jar diff --git a/squirrel.api/src/main/java/org/dice_research/squirrel/frontier/ExtendedFrontier.java b/squirrel.api/src/main/java/org/dice_research/squirrel/frontier/ExtendedFrontier.java index b3ea8bc25..27af41294 100644 --- a/squirrel.api/src/main/java/org/dice_research/squirrel/frontier/ExtendedFrontier.java +++ b/squirrel.api/src/main/java/org/dice_research/squirrel/frontier/ExtendedFrontier.java @@ -13,5 +13,5 @@ public interface ExtendedFrontier extends Frontier { * @param lstUrisToReassign A list of {@link CrawleableUri} that should have been handeled by the * dead worker, but was not due to his sudden death. */ - void informAboutDeadWorker(String idOfWorker, List lstUrisToReassign); + void informAboutDeadWorker(int idOfWorker, List lstUrisToReassign); } \ No newline at end of file diff --git a/squirrel.api/src/main/java/org/dice_research/squirrel/queue/InMemoryQueue.java b/squirrel.api/src/main/java/org/dice_research/squirrel/queue/InMemoryQueue.java index c5b7016b6..e808e0882 100644 --- a/squirrel.api/src/main/java/org/dice_research/squirrel/queue/InMemoryQueue.java +++ b/squirrel.api/src/main/java/org/dice_research/squirrel/queue/InMemoryQueue.java @@ -1,5 +1,7 @@ package org.dice_research.squirrel.queue; +import java.net.InetAddress; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; @@ -12,6 +14,7 @@ public class InMemoryQueue extends AbstractIpAddressBasedQueue { protected SortedMap> queue; + private static final int LIMITFORITERATOR = 50; public InMemoryQueue() { queue = new TreeMap>(); @@ -54,5 +57,10 @@ public void open() { @Override public void close() { } + + @Override + public Iterator>> getIPURIIterator() { + return queue.entrySet().stream().limit(LIMITFORITERATOR).map(e -> new AbstractMap.SimpleEntry<>(e.getKey().ip, e.getValue())).iterator(); + } } diff --git a/squirrel.api/src/main/java/org/dice_research/squirrel/queue/IpAddressBasedQueue.java b/squirrel.api/src/main/java/org/dice_research/squirrel/queue/IpAddressBasedQueue.java index 7f7cc842a..e3118c87a 100644 --- a/squirrel.api/src/main/java/org/dice_research/squirrel/queue/IpAddressBasedQueue.java +++ b/squirrel.api/src/main/java/org/dice_research/squirrel/queue/IpAddressBasedQueue.java @@ -1,6 +1,11 @@ package org.dice_research.squirrel.queue; import java.net.InetAddress; +import java.util.AbstractMap; +import java.util.Iterator; +import java.util.List; + +import org.dice_research.squirrel.data.uri.CrawleableUri; /** * This extension of the {@link UriQueue} interface defines additional methods @@ -29,4 +34,10 @@ public interface IpAddressBasedQueue extends UriQueue { * @return the number of IP addresses that are currently blocked. */ public int getNumberOfBlockedIps(); + /** + * Goes through the queue und collects all IP-address with their URIs + * + * @return a IP-address-iterator with the list of uris for each IP-address + */ + Iterator>> getIPURIIterator(); } diff --git a/squirrel.api/src/main/java/org/dice_research/squirrel/rabbit/msgs/UriSetRequest.java b/squirrel.api/src/main/java/org/dice_research/squirrel/rabbit/msgs/UriSetRequest.java index 6dfff7742..240aaa38d 100644 --- a/squirrel.api/src/main/java/org/dice_research/squirrel/rabbit/msgs/UriSetRequest.java +++ b/squirrel.api/src/main/java/org/dice_research/squirrel/rabbit/msgs/UriSetRequest.java @@ -9,7 +9,7 @@ public class UriSetRequest implements Serializable { /** * The id of the {@link org.dice_research.squirrel.worker.Worker} that sent this request. */ - private String idOfWorker; + private int idOfWorker; /** * Indicates whether the worker (see {@link #idOfWorker}) sends {@link org.dice_research.squirrel.worker.impl.AliveMessage}. @@ -20,7 +20,7 @@ public class UriSetRequest implements Serializable { * Standard constructor setting just default values. */ public UriSetRequest() { - this(null, false); + this(0, false); } /** @@ -29,12 +29,12 @@ public UriSetRequest() { * @param idOfWorker * @param workerSendsAliveMessages */ - public UriSetRequest(String idOfWorker, boolean workerSendsAliveMessages) { + public UriSetRequest(int idOfWorker, boolean workerSendsAliveMessages) { this.idOfWorker = idOfWorker; this.workerSendsAliveMessages = workerSendsAliveMessages; } - public String getIdOfWorker() { + public int getIdOfWorker() { return idOfWorker; } diff --git a/squirrel.api/src/main/java/org/dice_research/squirrel/worker/AliveMessage.java b/squirrel.api/src/main/java/org/dice_research/squirrel/worker/AliveMessage.java index 11209599d..3becf4d39 100644 --- a/squirrel.api/src/main/java/org/dice_research/squirrel/worker/AliveMessage.java +++ b/squirrel.api/src/main/java/org/dice_research/squirrel/worker/AliveMessage.java @@ -16,14 +16,14 @@ public class AliveMessage implements Serializable { /** * The id of the worker that sends the alive message. */ - private String idOfWorker; + private int idOfWorker; /** * Create aliveMessage by an id of a worker. * * @param idOfWorker The id of the worker. */ - public AliveMessage(String idOfWorker) { + public AliveMessage(int idOfWorker) { this.idOfWorker = idOfWorker; } @@ -32,7 +32,7 @@ public AliveMessage(String idOfWorker) { * * @return the id of the worker. */ - public String getIdOfWorker() { + public int getIdOfWorker() { return idOfWorker; } diff --git a/squirrel.api/src/main/java/org/dice_research/squirrel/worker/Worker.java b/squirrel.api/src/main/java/org/dice_research/squirrel/worker/Worker.java index eb33acb1f..60d194155 100644 --- a/squirrel.api/src/main/java/org/dice_research/squirrel/worker/Worker.java +++ b/squirrel.api/src/main/java/org/dice_research/squirrel/worker/Worker.java @@ -38,6 +38,12 @@ public interface Worker extends Runnable { * @return True iff the worker sends alive messages. */ boolean sendsAliveMessages(); + + /** + * Gives the unique id of the worker. + * @return The id of the worker. + */ + int getId(); public void setTerminateFlag(boolean terminateFlag); diff --git a/squirrel.deduplication/pom.xml b/squirrel.deduplication/pom.xml index af94629ca..1e5fa9d11 100644 --- a/squirrel.deduplication/pom.xml +++ b/squirrel.deduplication/pom.xml @@ -6,7 +6,7 @@ org.dice-research squirrel - 0.3.0-SNAPSHOT + 0.3.0 squirrel.deduplication jar diff --git a/squirrel.frontier/pom.xml b/squirrel.frontier/pom.xml index 862565d78..5bf998cc0 100644 --- a/squirrel.frontier/pom.xml +++ b/squirrel.frontier/pom.xml @@ -6,7 +6,7 @@ org.dice-research squirrel - 0.3.0-SNAPSHOT + 0.3.0 squirrel.frontier jar diff --git a/squirrel.frontier/src/main/java/org/dice_research/squirrel/components/FrontierComponent.java b/squirrel.frontier/src/main/java/org/dice_research/squirrel/components/FrontierComponent.java index eb6ceacb5..38cafcebe 100644 --- a/squirrel.frontier/src/main/java/org/dice_research/squirrel/components/FrontierComponent.java +++ b/squirrel.frontier/src/main/java/org/dice_research/squirrel/components/FrontierComponent.java @@ -11,6 +11,7 @@ import org.dice_research.squirrel.Constants; import org.dice_research.squirrel.configurator.MongoConfiguration; import org.dice_research.squirrel.configurator.SeedConfiguration; +import org.dice_research.squirrel.configurator.WebConfiguration; import org.dice_research.squirrel.configurator.WhiteListConfiguration; import org.dice_research.squirrel.data.uri.CrawleableUri; import org.dice_research.squirrel.data.uri.UriUtils; @@ -26,6 +27,7 @@ import org.dice_research.squirrel.frontier.Frontier; import org.dice_research.squirrel.frontier.impl.ExtendedFrontierImpl; import org.dice_research.squirrel.frontier.impl.FrontierImpl; +import org.dice_research.squirrel.frontier.impl.FrontierSenderToWebservice; import org.dice_research.squirrel.frontier.impl.WorkerGuard; import org.dice_research.squirrel.queue.InMemoryQueue; import org.dice_research.squirrel.queue.IpAddressBasedQueue; @@ -66,6 +68,7 @@ public void init() throws Exception { super.init(); serializer = new GzipJavaUriSerializer(); MongoConfiguration mongoConfiguration = MongoConfiguration.getMDBConfiguration(); + WebConfiguration webConfiguration = WebConfiguration.getWebConfiguration(); if(mongoConfiguration != null) { String dbHostName = mongoConfiguration.getMDBHostName(); Integer dbPort = mongoConfiguration.getMDBPort(); @@ -106,22 +109,22 @@ public void init() throws Exception { LOGGER.info("Frontier initialized."); -// if (webConfiguration.isCommunicationWithWebserviceEnabled()) { -// final FrontierSenderToWebservice sender = new FrontierSenderToWebservice(outgoingDataQueuefactory, -// workerGuard, queue, knownUriFilter, uriReferences); -// LOGGER.trace("FrontierSenderToWebservice -> sendCrawledGraph is set to " -// + webConfiguration.isVisualizationOfCrawledGraphEnabled()); -// Thread senderThread = new Thread(sender); -// senderThread.setName("Sender to the Webservice via RabbitMQ (current information from the Frontier)"); -// senderThread.start(); -// LOGGER.info("Started thread [" + senderThread.getName() + "] "); -// } else { -// LOGGER.info("webConfiguration.isCommunicationWithWebserviceEnabled is set to " -// + webConfiguration.isCommunicationWithWebserviceEnabled() + "/" -// + webConfiguration.isVisualizationOfCrawledGraphEnabled() -// + ". No WebServiceSenderThread will be started!"); -// } + if (webConfiguration.isCommunicationWithWebserviceEnabled()) { + final FrontierSenderToWebservice sender = new FrontierSenderToWebservice(outgoingDataQueuefactory, + workerGuard, queue, knownUriFilter, uriReferences); + LOGGER.trace("FrontierSenderToWebservice -> sendCrawledGraph is set to " + + webConfiguration.isVisualizationOfCrawledGraphEnabled()); + Thread senderThread = new Thread(sender); + senderThread.setName("Sender to the Webservice via RabbitMQ (current information from the Frontier)"); + senderThread.start(); + LOGGER.info("Started thread [" + senderThread.getName() + "] "); + } else { + LOGGER.info("webConfiguration.isCommunicationWithWebserviceEnabled is set to " + + webConfiguration.isCommunicationWithWebserviceEnabled() + "/" + + webConfiguration.isVisualizationOfCrawledGraphEnabled() + + ". No WebServiceSenderThread will be started!"); + } } @Override @@ -189,7 +192,7 @@ public void handleData(byte[] data, ResponseHandler handler, String responseQueu crawlingResult.uris); } else if (deserializedData instanceof AliveMessage) { AliveMessage message = (AliveMessage) deserializedData; - String idReceived = message.getIdOfWorker(); + int idReceived = message.getIdOfWorker(); LOGGER.trace("Received alive message from worker with id " + idReceived); workerGuard.putNewTimestamp(idReceived); } else { @@ -228,7 +231,7 @@ protected void processSeedFile(String seedFile) { } } - public void informFrontierAboutDeadWorker(String idOfWorker, List lstUrisToReassign) { + public void informFrontierAboutDeadWorker(int idOfWorker, List lstUrisToReassign) { if (frontier instanceof ExtendedFrontier) { ((ExtendedFrontier) frontier).informAboutDeadWorker(idOfWorker, lstUrisToReassign); } diff --git a/squirrel.frontier/src/main/java/org/dice_research/squirrel/configurator/WebConfiguration.java b/squirrel.frontier/src/main/java/org/dice_research/squirrel/configurator/WebConfiguration.java new file mode 100644 index 000000000..eecec3b9b --- /dev/null +++ b/squirrel.frontier/src/main/java/org/dice_research/squirrel/configurator/WebConfiguration.java @@ -0,0 +1,32 @@ +package org.dice_research.squirrel.configurator; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebConfiguration extends Configuration { + private static final Logger LOGGER = LoggerFactory.getLogger(WebConfiguration.class); + + private boolean communicationWithWebserviceEnabled; + private boolean visualizationOfCrawledGraphEnabaled; + + private static final String COMMUNICATION_WITH_WEBSERVICE = "COMMUNICATION_WITH_WEBSERVICE"; + private static final String VISUALIZATION_OF_CRAWLED_GRAPH = "VISUALIZATION_OF_CRAWLED_GRAPH"; + + private WebConfiguration(boolean communicationWithWebserviceEnabled, boolean visualizationOfCrawledGraphEnabaled) { + this.communicationWithWebserviceEnabled = communicationWithWebserviceEnabled; + this.visualizationOfCrawledGraphEnabaled = visualizationOfCrawledGraphEnabaled; + } + + public static WebConfiguration getWebConfiguration() { + return new WebConfiguration(Configuration.getEnvBoolean(COMMUNICATION_WITH_WEBSERVICE, LOGGER), Configuration.getEnvBoolean(VISUALIZATION_OF_CRAWLED_GRAPH, LOGGER)); + } + + public boolean isCommunicationWithWebserviceEnabled() { + return communicationWithWebserviceEnabled; + } + + public boolean isVisualizationOfCrawledGraphEnabled() { + return visualizationOfCrawledGraphEnabaled; + } +} diff --git a/squirrel.frontier/src/main/java/org/dice_research/squirrel/data/uri/filter/MongoDBKnowUriFilter.java b/squirrel.frontier/src/main/java/org/dice_research/squirrel/data/uri/filter/MongoDBKnowUriFilter.java index aff7c54a0..22fc936b9 100644 --- a/squirrel.frontier/src/main/java/org/dice_research/squirrel/data/uri/filter/MongoDBKnowUriFilter.java +++ b/squirrel.frontier/src/main/java/org/dice_research/squirrel/data/uri/filter/MongoDBKnowUriFilter.java @@ -7,7 +7,6 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -22,9 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; -import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; diff --git a/squirrel.frontier/src/main/java/org/dice_research/squirrel/data/uri/norm/NormalizerImpl.java b/squirrel.frontier/src/main/java/org/dice_research/squirrel/data/uri/norm/NormalizerImpl.java index 5775f0ed6..14685da39 100644 --- a/squirrel.frontier/src/main/java/org/dice_research/squirrel/data/uri/norm/NormalizerImpl.java +++ b/squirrel.frontier/src/main/java/org/dice_research/squirrel/data/uri/norm/NormalizerImpl.java @@ -23,113 +23,115 @@ */ public class NormalizerImpl implements UriNormalizer { - /** - * Nutch 1098 - finds URL encoded parts of the URL - */ - private final static Pattern UNESCAPE_RULE_PATTERN = Pattern.compile("%([0-9A-Fa-f]{2})"); - /** - * look-up table for characters which should not be escaped in URL paths - */ - private final static BitSet UNESCAPED_CHARS = new BitSet(0x7F); + /** + * Nutch 1098 - finds URL encoded parts of the URL + */ + private final static Pattern UNESCAPE_RULE_PATTERN = Pattern.compile("%([0-9A-Fa-f]{2})"); + /** + * look-up table for characters which should not be escaped in URL paths + */ + private final static BitSet UNESCAPED_CHARS = new BitSet(0x7F); - static { - /* - * https://tools.ietf.org/html/rfc3986#section-2.2 For consistency, - * percent-encoded octets in the ranges of ALPHA (%41-%5A and %61-%7A), DIGIT - * (%30-%39), hyphen (%2D), period (%2E), underscore (%5F), or tilde (%7E) - * should not be created by URI producers and, when found in a URI, should be - * decoded to their corresponding unreserved characters by URI normalizers. - */ - UNESCAPED_CHARS.set(0x2D, 0x2E); - UNESCAPED_CHARS.set(0x30, 0x39); - UNESCAPED_CHARS.set(0x41, 0x5A); - UNESCAPED_CHARS.set(0x61, 0x7A); - UNESCAPED_CHARS.set(0x5F); - UNESCAPED_CHARS.set(0x7E); - } + static { + /* + * https://tools.ietf.org/html/rfc3986#section-2.2 For consistency, + * percent-encoded octets in the ranges of ALPHA (%41-%5A and %61-%7A), DIGIT + * (%30-%39), hyphen (%2D), period (%2E), underscore (%5F), or tilde (%7E) + * should not be created by URI producers and, when found in a URI, should be + * decoded to their corresponding unreserved characters by URI normalizers. + */ + UNESCAPED_CHARS.set(0x2D, 0x2E); + UNESCAPED_CHARS.set(0x30, 0x39); + UNESCAPED_CHARS.set(0x41, 0x5A); + UNESCAPED_CHARS.set(0x61, 0x7A); + UNESCAPED_CHARS.set(0x5F); + UNESCAPED_CHARS.set(0x7E); + } - @Override - public CrawleableUri normalize(CrawleableUri uri) { - URI uriObject = uri.getUri(); - boolean changed = false; - // normalize path - String path = uriObject.getPath(); - String temp = normalizePath(path); - if (temp != path) { - path = temp; - } - // Copy Normalization from - // https://github.com/crawler-commons/crawler-commons/blob/master/src/main/java/crawlercommons/filters/basic/BasicURLNormalizer.java - // OR use URI.normalize() + @Override + public CrawleableUri normalize(CrawleableUri uri) { + URI uriObject = uri.getUri(); + boolean changed = false; + // normalize path + String path = uriObject.getPath(); + if (path != null) { + String temp = normalizePath(path); + if (temp != path) { + path = temp; + } + } + // Copy Normalization from + // https://github.com/crawler-commons/crawler-commons/blob/master/src/main/java/crawlercommons/filters/basic/BasicURLNormalizer.java + // OR use URI.normalize() - // Check whether the query part of a URI has to be sorted + // Check whether the query part of a URI has to be sorted - // Filter attributes of the URI - //uriObject.getQuery(); + // Filter attributes of the URI + // uriObject.getQuery(); - if (changed) { - // TODO create new URI object; - } - return uri; - } + if (changed) { + // TODO create new URI object; + } + return uri; + } - /** - * Path normalization adapted from the {@link URI} class (which is based upon - * src/solaris/native/java/io/canonicalize_md.c) and the Crawler - * Commons project. - * - * @param path - * @return the normalized path or the given path object if no changes have been - * made. - */ - protected String normalizePath(String path) { - // Check for encoded parts - Matcher matcher = UNESCAPE_RULE_PATTERN.matcher(path); - StringBuffer changedPath = null; - if (matcher.find()) { - changedPath = new StringBuffer(path); - int hex, pos = 0; - do { - changedPath.append(path.substring(pos, matcher.start())); - pos = matcher.start(); - hex = getHexValue(path.charAt(pos + 1), path.charAt(pos + 2)); - // If this character shouldn't be escaped - if (UNESCAPED_CHARS.get(hex)) { - changedPath.append((char) hex); - } else { - changedPath.append(path.substring(pos, pos + 3)); - } - pos += 3; - } while (matcher.find()); - if (pos < path.length()) { - changedPath.append(path.substring(pos)); - } - } - if (changedPath == null) { - return PathNormalization.normalize(path); - } else { - String newPath = changedPath.toString(); - return PathNormalization.normalize(newPath.equals(path) ? path : newPath); - } - } + /** + * Path normalization adapted from the {@link URI} class (which is based upon + * src/solaris/native/java/io/canonicalize_md.c) and the Crawler + * Commons project. + * + * @param path + * @return the normalized path or the given path object if no changes have been + * made. + */ + protected String normalizePath(String path) { + // Check for encoded parts + Matcher matcher = UNESCAPE_RULE_PATTERN.matcher(path); + StringBuffer changedPath = null; + if (matcher.find()) { + changedPath = new StringBuffer(path); + int hex, pos = 0; + do { + changedPath.append(path.substring(pos, matcher.start())); + pos = matcher.start(); + hex = getHexValue(path.charAt(pos + 1), path.charAt(pos + 2)); + // If this character shouldn't be escaped + if (UNESCAPED_CHARS.get(hex)) { + changedPath.append((char) hex); + } else { + changedPath.append(path.substring(pos, pos + 3)); + } + pos += 3; + } while (matcher.find()); + if (pos < path.length()) { + changedPath.append(path.substring(pos)); + } + } + if (changedPath == null) { + return PathNormalization.normalize(path); + } else { + String newPath = changedPath.toString(); + return PathNormalization.normalize(newPath.equals(path) ? path : newPath); + } + } - protected static int getHexValue(char c1, char c2) { - int hex; - if (c1 <= 0x39) { - hex = c1 - 0x30; - } else { - // Check whether it is A-F or a-f - hex = (c1 <= 0x46) ? (c1 - 0x37) : (c1 - 0x57); - } - hex <<= 4; - if (c2 <= 0x39) { - hex |= c2 - 0x30; - } else { - // Check whether it is A-F or a-f - hex |= (c2 <= 0x46) ? (c2 - 0x37) : (c2 - 0x57); - } - return hex; - } + protected static int getHexValue(char c1, char c2) { + int hex; + if (c1 <= 0x39) { + hex = c1 - 0x30; + } else { + // Check whether it is A-F or a-f + hex = (c1 <= 0x46) ? (c1 - 0x37) : (c1 - 0x57); + } + hex <<= 4; + if (c2 <= 0x39) { + hex |= c2 - 0x30; + } else { + // Check whether it is A-F or a-f + hex |= (c2 <= 0x46) ? (c2 - 0x37) : (c2 - 0x57); + } + return hex; + } } diff --git a/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/ExtendedFrontierImpl.java b/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/ExtendedFrontierImpl.java index 83b71fd42..65018bf4a 100644 --- a/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/ExtendedFrontierImpl.java +++ b/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/ExtendedFrontierImpl.java @@ -66,7 +66,7 @@ public ExtendedFrontierImpl(UriNormalizer normalizer, KnownUriFilter knownUriFil } @Override - public void informAboutDeadWorker(String idOfWorker, List lstUrisToReassign) { + public void informAboutDeadWorker(int idOfWorker, List lstUrisToReassign) { if (queue instanceof IpAddressBasedQueue) { IpAddressBasedQueue ipQueue = (IpAddressBasedQueue) queue; diff --git a/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/FrontierImpl.java b/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/FrontierImpl.java index 093d03966..de1ed3b8f 100644 --- a/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/FrontierImpl.java +++ b/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/FrontierImpl.java @@ -235,6 +235,7 @@ public void addNewUri(CrawleableUri uri) { // Make sure that the IP is known try { uri = this.uriProcessor.recognizeInetAddress(uri); + } catch (UnknownHostException e) { LOGGER.error("Could not recognize IP for {}, unknown host", uri.getUri()); } @@ -243,9 +244,11 @@ public void addNewUri(CrawleableUri uri) { } else { LOGGER.error("Couldn't determine the Inet address of \"{}\". It will be ignored.", uri.getUri()); } + knownUriFilter.add(uri, System.currentTimeMillis()); } else { LOGGER.warn("addNewUri(" + uri + "): " + uri.getUri().getScheme() + " is not supported, only " + schemeUriFilter.getSchemes() + ". Will not added!"); } + } else { LOGGER.info("addNewUri(" + uri + "): URI is not good [" + knownUriFilter + "]. Will not be added!"); } diff --git a/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/FrontierSenderToWebservice.java b/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/FrontierSenderToWebservice.java index 20487509e..8e3f694d0 100644 --- a/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/FrontierSenderToWebservice.java +++ b/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/FrontierSenderToWebservice.java @@ -1,17 +1,17 @@ package org.dice_research.squirrel.frontier.impl; + import com.SquirrelWebObject; import com.graph.VisualisationGraph; import com.graph.VisualisationNode; import com.rabbitmq.client.Channel; - -import org.apache.commons.io.IOUtils; import org.dice_research.squirrel.data.uri.CrawleableUri; import org.dice_research.squirrel.data.uri.filter.KnownUriFilter; import org.dice_research.squirrel.data.uri.info.URIReferences; import org.dice_research.squirrel.data.uri.serialize.Serializer; import org.dice_research.squirrel.data.uri.serialize.java.GzipJavaUriSerializer; import org.dice_research.squirrel.queue.IpAddressBasedQueue; +import org.apache.commons.io.IOUtils; import org.hobbit.core.rabbit.DataSender; import org.hobbit.core.rabbit.DataSenderImpl; import org.hobbit.core.rabbit.RabbitQueueFactory; @@ -161,31 +161,29 @@ private SquirrelWebObject generateSquirrelWebObject() throws IllegalAccessExcept LinkedHashMap> currentQueue = new LinkedHashMap<>(50); Iterator>> i; - // FIXME!!! - throw new RuntimeException("This is not correctly implemented!"); -// for (i = queue.getIPURIIterator(); i.hasNext() && currentQueue.size() < 50; ) { -// AbstractMap.SimpleEntry> entry = i.next(); -// currentQueue.put(entry.getKey(), entry.getValue()); -// } -// if (currentQueue.isEmpty()) { -// newObject.setIPMapPendingURis(EMPTY_MAP); -// newObject.setPendingURIs(EMPTY_LIST); -// newObject.setNextCrawledURIs(EMPTY_LIST); -// } else { -// newObject.setIPMapPendingURis(currentQueue.entrySet().stream() -// .map(e -> new AbstractMap.SimpleEntry<>(e.getKey().getHostAddress(), e.getValue().stream().map(uri -> uri.getUri().getPath()).collect(Collectors.toList()))) -// .collect(HashMap::new, (m, entry) -> m.put(entry.getKey(), entry.getValue()), HashMap::putAll)); -// List pendingURIs = new ArrayList<>(currentQueue.size()); -// currentQueue.forEach((key, value) -> value.forEach(uri -> pendingURIs.add(uri.getUri().toString()))); -// newObject.setPendingURIs(pendingURIs); -// newObject.setNextCrawledURIs(currentQueue.entrySet().iterator().next().getValue().stream().map(e -> e.getUri().toString()).collect(Collectors.toList())); -// } -// -// //Michael remarks, that's not a good idea to pass all crawled URIs, because that takes to much time... -// //newObject.setCrawledURIs(Collections.EMPTY_LIST); -// newObject.setCountOfCrawledURIs((int) knownUriFilter.count()); -// -// return newObject; + for (i = queue.getIPURIIterator(); i.hasNext() && currentQueue.size() < 50; ) { + AbstractMap.SimpleEntry> entry = i.next(); + currentQueue.put(entry.getKey(), entry.getValue()); + } + if (currentQueue.isEmpty()) { + newObject.setIPMapPendingURis(EMPTY_MAP); + newObject.setPendingURIs(EMPTY_LIST); + newObject.setNextCrawledURIs(EMPTY_LIST); + } else { + newObject.setIPMapPendingURis(currentQueue.entrySet().stream() + .map(e -> new AbstractMap.SimpleEntry<>(e.getKey().getHostAddress(), e.getValue().stream().map(uri -> uri.getUri().getPath()).collect(Collectors.toList()))) + .collect(HashMap::new, (m, entry) -> m.put(entry.getKey(), entry.getValue()), HashMap::putAll)); + List pendingURIs = new ArrayList<>(currentQueue.size()); + currentQueue.forEach((key, value) -> value.forEach(uri -> pendingURIs.add(uri.getUri().toString()))); + newObject.setPendingURIs(pendingURIs); + newObject.setNextCrawledURIs(currentQueue.entrySet().iterator().next().getValue().stream().map(e -> e.getUri().toString()).collect(Collectors.toList())); + } + + //Michael remarks, that's not a good idea to pass all crawled URIs, because that takes to much time... + //newObject.setCrawledURIs(Collections.EMPTY_LIST); + newObject.setCountOfCrawledURIs((int) knownUriFilter.count()); + + return newObject; } /** diff --git a/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/WorkerGuard.java b/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/WorkerGuard.java index 3ee7f0cc9..b69ad1cb5 100644 --- a/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/WorkerGuard.java +++ b/squirrel.frontier/src/main/java/org/dice_research/squirrel/frontier/impl/WorkerGuard.java @@ -24,7 +24,7 @@ public class WorkerGuard { * A map from {@link org.dice_research.squirrel.worker.Worker} id to {@link WorkerInfo} containing information about the * {@link org.dice_research.squirrel.worker.Worker}. */ - private Map mapWorkerInfo = Collections.synchronizedMap(new HashMap<>()); + private Map mapWorkerInfo = Collections.synchronizedMap(new HashMap<>()); /** * After this period of time (in seconds), a worker is considered to be dead if he has not sent @@ -53,8 +53,8 @@ public WorkerGuard(FrontierComponent frontierComponent) { timer.schedule(new TimerTask() { @Override public void run() { - List lstIdsToBeRemoved = new ArrayList<>(); - for (String idWorker : mapWorkerInfo.keySet()) { + List lstIdsToBeRemoved = new ArrayList<>(); + for (int idWorker : mapWorkerInfo.keySet()) { if (mapWorkerInfo.get(idWorker).getDateLastAlive() == null) { continue; @@ -88,7 +88,7 @@ public void run() { * * @param idOfWorker the given id. */ - public void putNewTimestamp(String idOfWorker) { + public void putNewTimestamp(int idOfWorker) { WorkerInfo workerInfo; if (mapWorkerInfo.containsKey(idOfWorker)) { workerInfo = mapWorkerInfo.get(idOfWorker); @@ -105,7 +105,7 @@ public void putNewTimestamp(String idOfWorker) { * @param idOfWorker The id of the worker for which to put the uris. * @param lstUris The uris to put. */ - public void putUrisForWorker(String idOfWorker, boolean workerSendsAliveMessages, List lstUris) { + public void putUrisForWorker(int idOfWorker, boolean workerSendsAliveMessages, List lstUris) { WorkerInfo workerInfo; if (mapWorkerInfo.containsKey(idOfWorker)) { workerInfo = mapWorkerInfo.get(idOfWorker); @@ -139,7 +139,7 @@ public void shutdown() { timer.cancel(); } - public Map getMapWorkerInfo() { + public Map getMapWorkerInfo() { return mapWorkerInfo; } diff --git a/squirrel.frontier/src/main/java/org/dice_research/squirrel/queue/MongoDBQueue.java b/squirrel.frontier/src/main/java/org/dice_research/squirrel/queue/MongoDBQueue.java index 4fb5db6d7..93f10319d 100644 --- a/squirrel.frontier/src/main/java/org/dice_research/squirrel/queue/MongoDBQueue.java +++ b/squirrel.frontier/src/main/java/org/dice_research/squirrel/queue/MongoDBQueue.java @@ -7,28 +7,28 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.AbstractMap; +import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import org.bson.Document; import org.bson.types.Binary; import org.dice_research.squirrel.data.uri.CrawleableUri; +import org.dice_research.squirrel.data.uri.CrawleableUriFactoryImpl; import org.dice_research.squirrel.data.uri.UriType; import org.dice_research.squirrel.data.uri.serialize.Serializer; import org.dice_research.squirrel.data.uri.serialize.java.SnappyJavaUriSerializer; -import org.dice_research.squirrel.queue.AbstractIpAddressBasedQueue; -import org.dice_research.squirrel.queue.IpUriTypePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mongodb.MongoClient; -import com.mongodb.MongoClientOptions; -import com.mongodb.ServerAddress; +import com.mongodb.MongoWriteException; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; -import com.mongodb.client.model.DropIndexOptions; import com.mongodb.client.model.Indexes; @SuppressWarnings("deprecation") @@ -38,7 +38,7 @@ public class MongoDBQueue extends AbstractIpAddressBasedQueue { private MongoDatabase mongoDB; private Serializer serializer; private final String DB_NAME ="squirrel"; - private final String COLLECTION_NAME = "queue"; + private final String COLLECTION_QUEUE = "queue"; private final String COLLECTION_URIS = "uris"; private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBQueue.class); @@ -57,12 +57,12 @@ public MongoDBQueue(String hostName, Integer port, Serializer serializer) { } public void purge() { - mongoDB.getCollection(COLLECTION_NAME).drop(); + mongoDB.getCollection(COLLECTION_QUEUE).drop(); mongoDB.getCollection(COLLECTION_URIS).drop(); } public long length() { - return mongoDB.getCollection(COLLECTION_NAME).count(); + return mongoDB.getCollection(COLLECTION_QUEUE).count(); } public static void main(String[] args) throws URISyntaxException, UnknownHostException { @@ -82,7 +82,7 @@ public static void main(String[] args) throws URISyntaxException, UnknownHostExc @Override public void close() { - mongoDB.getCollection(COLLECTION_NAME).drop(); + mongoDB.getCollection(COLLECTION_QUEUE).drop(); mongoDB.getCollection(COLLECTION_URIS).drop(); client.close(); } @@ -91,9 +91,9 @@ public void close() { public void open() { mongoDB = client.getDatabase(DB_NAME); if(!queueTableExists()) { - mongoDB.createCollection(COLLECTION_NAME); + mongoDB.createCollection(COLLECTION_QUEUE); mongoDB.createCollection(COLLECTION_URIS); - MongoCollection mongoCollection = mongoDB.getCollection(COLLECTION_NAME); + MongoCollection mongoCollection = mongoDB.getCollection(COLLECTION_QUEUE); MongoCollection mongoCollectionUris = mongoDB.getCollection(COLLECTION_URIS); mongoCollection.createIndex(Indexes.compoundIndex(Indexes.ascending("ipAddress"), Indexes.ascending("type"))); mongoCollectionUris.createIndex(Indexes.compoundIndex(Indexes.ascending("uri"), Indexes.ascending("ipAddress"),Indexes.ascending("type"))); @@ -103,7 +103,7 @@ public void open() { public boolean queueTableExists() { for(String collection: mongoDB.listCollectionNames()) { - if(collection.toLowerCase().equals(COLLECTION_NAME.toLowerCase())) { + if(collection.toLowerCase().equals(COLLECTION_QUEUE.toLowerCase())) { return true; } } @@ -128,7 +128,7 @@ protected void addToQueue(CrawleableUri uri) { protected Iterator getIterator() { - MongoCursor cursor = mongoDB.getCollection(COLLECTION_NAME).find().iterator(); + MongoCursor cursor = mongoDB.getCollection(COLLECTION_QUEUE).find().iterator(); Iterator ipUriTypePairIterator = new Iterator() { @Override @@ -176,7 +176,7 @@ protected List getUris(IpUriTypePair pair) { LOGGER.error("Error while retrieving uri from MongoDBQueue",e); } - mongoDB.getCollection(COLLECTION_NAME).deleteOne(new Document("ipAddress",pair.ip.getHostAddress()).append("type", pair.type.toString())); + mongoDB.getCollection(COLLECTION_QUEUE).deleteOne(new Document("ipAddress",pair.ip.getHostAddress()).append("type", pair.type.toString())); mongoDB.getCollection(COLLECTION_URIS).deleteMany(new Document("ipAddress",pair.ip.getHostAddress()).append("type", pair.type.toString())); @@ -185,7 +185,7 @@ protected List getUris(IpUriTypePair pair) { public boolean queueContainsIpAddressTypeKey(CrawleableUri curi ,List ipAddressTypeKey) { - Iterator iterator = mongoDB.getCollection(COLLECTION_NAME).find(new Document("ipAddress", ipAddressTypeKey.get(0)). + Iterator iterator = mongoDB.getCollection(COLLECTION_QUEUE).find(new Document("ipAddress", ipAddressTypeKey.get(0)). append("type", ipAddressTypeKey.get(1))).iterator(); if(iterator.hasNext()) { @@ -211,14 +211,24 @@ public void addCrawleableUri(CrawleableUri uri, List ipAddressTypeKey) { } catch (Exception e) { - LOGGER.error("Error while adding uri to MongoDBQueue",e); + if(e instanceof MongoWriteException) + LOGGER.info("Uri: " + uri.getUri().toString() + " already in queue. Ignoring..."); + else + LOGGER.error("Error while adding uri to MongoDBQueue",e); } } public void addCrawleableUri(CrawleableUri uri) { - - mongoDB.getCollection(COLLECTION_NAME).insertOne(crawleableUriToMongoDocument(uri)[0]); + + try { + mongoDB.getCollection(COLLECTION_QUEUE).insertOne(crawleableUriToMongoDocument(uri)[0]); mongoDB.getCollection(COLLECTION_URIS).insertOne(crawleableUriToMongoDocument(uri)[1]); + }catch (Exception e) { + if(e instanceof MongoWriteException) + LOGGER.info("Uri: " + uri.getUri().toString() + " already in queue. Ignoring..."); + else + LOGGER.error("Error while adding uri to MongoDBQueue",e); + } LOGGER.debug("Inserted new UriTypePair"); } @@ -270,5 +280,62 @@ public List packTuple(String str_1, String str_2) { pack.add(str_2); return pack; } + + private List createCrawleableUriList(List uris) { + List resultUris = new ArrayList<>(); + + for (Object uriString : uris) { + try { + resultUris.add(serializer.deserialize((byte[]) uriString)); + } catch (Exception e) { + LOGGER.error("Couldn't deserialize uri", e); + } + } + + return resultUris; + } + + @Override + public Iterator>> getIPURIIterator() { + // TODO Auto-generated method stub + return new Iterator>>(){ + + Iterator cursor = getIterator(); + + + @Override + public boolean hasNext() { + // TODO Auto-generated method stub + return cursor.hasNext(); + } + + @Override + public SimpleEntry> next() { + IpUriTypePair pair = cursor.next(); + + + + Iterator uriDocs = mongoDB.getCollection(COLLECTION_URIS).find(new Document("ipAddress", pair.ip.getHostAddress().toString()) + .append("type", pair.type.toString())).iterator(); + List value = new ArrayList(); + while(uriDocs.hasNext()) { + Document doc = uriDocs.next(); + try { + value.add( serializer.deserialize( ((Binary) doc.get("uri")).getData()) ); + } catch (IOException e) { + LOGGER.error("Was not able to read the field from the MDBQueue \"uris\""); + value.clear(); + } + } + + + + return new AbstractMap.SimpleEntry<>(pair.ip, value); + + + } + + }; + } } diff --git a/squirrel.mockup/pom.xml b/squirrel.mockup/pom.xml index 224da0b1b..9b2c91123 100644 --- a/squirrel.mockup/pom.xml +++ b/squirrel.mockup/pom.xml @@ -6,7 +6,7 @@ org.dice-research squirrel - 0.3.0-SNAPSHOT + 0.3.0 squirrel.mockup jar diff --git a/squirrel.web-api/pom.xml b/squirrel.web-api/pom.xml index 3b1c4dda1..2a4b79668 100644 --- a/squirrel.web-api/pom.xml +++ b/squirrel.web-api/pom.xml @@ -6,8 +6,9 @@ org.dice-research squirrel - 0.3.0-SNAPSHOT + 0.3.0 + squirrel.web-api jar Squirrel Web API diff --git a/squirrel.web/pom.xml b/squirrel.web/pom.xml index 2f2637182..21ab1b163 100644 --- a/squirrel.web/pom.xml +++ b/squirrel.web/pom.xml @@ -1,298 +1,116 @@ - 4.0.0 - - org.dice-research - squirrel - 0.3.0-SNAPSHOT - - squirrel.web - jar - Squirrel Web + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 + + org.dice-research + squirrel + 0.3.0 + + squirrel.web + jar + Squirrel Web - + - - 1.8 - ${project.parent.version} - - com.squirrel.Application - UTF-8 - @ - ${java.version} - UTF-8 - ${java.version} - + + 1.8 + ${project.parent.version} + + com.squirrel.Application + UTF-8 + @ + ${java.version} + UTF-8 + ${java.version} + - - - org.dice-research - squirrel.api - - - org.springframework.boot - spring-boot-starter-web - compile - 2.0.2.RELEASE - - - org.springframework.boot - spring-boot-starter-test - test - 2.0.2.RELEASE - - - com.jayway.jsonpath - json-path - test - 2.4.0 - - - org.springframework - spring-framework-bom - 5.0.6.RELEASE - pom - compile - - - org.springframework.amqp - spring-rabbit - 2.0.3.RELEASE - compile - - - - com.rabbitmq - amqp-client - 5.2.0 - compile - - - org.dice-research - squirrel.web-api - - + + + org.hobbit + core + + + org.springframework.boot + spring-boot-starter-web + 2.0.2.RELEASE + + + org.springframework.boot + spring-boot-starter-tomcat + 2.0.2.RELEASE + provided + + + org.springframework.boot + spring-boot-starter-test + test + 2.0.2.RELEASE + + + com.jayway.jsonpath + json-path + test + 2.4.0 + + + org.springframework + spring-framework-bom + 5.0.6.RELEASE + pom + + + com.fasterxml.jackson.core + jackson-databind + 2.9.4 + + + org.springframework.amqp + spring-rabbit + 2.0.3.RELEASE - - - - - true - ${basedir}/src/main/resources - - **/application*.yml - **/application*.yaml - **/application*.properties - - - - ${basedir}/src/main/resources - - **/application*.yml - **/application*.yaml - **/application*.properties - - - - - - - org.jetbrains.kotlin - kotlin-maven-plugin - ${kotlin.version} - - - compile - compile - - compile - - - - test-compile - test-compile - - test-compile - - - - - ${java.version} - true - - - - maven-compiler-plugin - - true - - - - maven-failsafe-plugin - - - - integration-test - verify - - - - - ${project.build.outputDirectory} - - - - maven-jar-plugin - - - - ${start-class} - true - - - - - - maven-war-plugin - - - - ${start-class} - true - - - - - - org.codehaus.mojo - exec-maven-plugin - - ${start-class} - - - - maven-resources-plugin - - - ${resource.delimiter} - - false - - - - pl.project13.maven - git-commit-id-plugin - - - - revision - - - - - true - yyyy-MM-dd'T'HH:mm:ssZ - true - ${project.build.outputDirectory}/git.properties - - - - org.springframework.boot - spring-boot-maven-plugin - - - - repackage - - - - - ${start-class} - - - - maven-shade-plugin - - - package - - shade - - - - - META-INF/spring.handlers - - - META-INF/spring.factories - - - META-INF/spring.schemas - - - - ${start-class} - - - - - - - - org.springframework.boot - spring-boot-maven-plugin - 2.0.2.RELEASE - - - - true - true - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - org.eclipse.m2e - lifecycle-mapping - 1.0.0 - - - - - - org.codehaus.mojo - flatten-maven-plugin - [1.0.0,) - - flatten - - - - - - - - - - - - + + + + com.rabbitmq + amqp-client + 3.6.0 + + + org.dice-research + squirrel.web-api + + + org.dice-research + squirrel.api + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + 2.1.1.RELEASE + + com.squirrel.Application + + + + + repackage + + + + + + + + diff --git a/squirrel.web/src/main/java/com/squirrel/Utilities/HTMLReader.java b/squirrel.web/src/main/java/com/squirrel/Utilities/HTMLReader.java index cf1741aaa..3d897ef3e 100644 --- a/squirrel.web/src/main/java/com/squirrel/Utilities/HTMLReader.java +++ b/squirrel.web/src/main/java/com/squirrel/Utilities/HTMLReader.java @@ -19,12 +19,15 @@ public static String getHTMLErrorPage(String errorMessage) { * @param filename the path and name of the (HTML) file, e.g. {@code ./WEB-INF/pages/index.html} * @return the content of the file (without line brakes) */ - public static String getText(String filename) { + @SuppressWarnings("resource") + public static String getText(String filename) { try { return new BufferedReader(new FileReader(filename)).lines().collect(StringBuilder::new, StringBuilder::append, StringBuilder::append) + ""; } catch (FileNotFoundException e) { if (filename.endsWith("_exception.html")) { - return "Unresolvable errorFILE EXCEPTION. Only files are: " + Arrays.deepToString(new File("./").list()) + ""; + System.out.println("not found " + filename); + return "Unresolvable errorFILE EXCEPTION. Only files are: " + Arrays.deepToString(new File("./").list()) + "" + + "
not found : "+ filename +" "; } return getHTMLErrorPage(e.getMessage()); } diff --git a/squirrel.web/src/main/java/com/squirrel/rabbit/RabbitController.java b/squirrel.web/src/main/java/com/squirrel/rabbit/RabbitController.java index e8077d602..7d4020209 100644 --- a/squirrel.web/src/main/java/com/squirrel/rabbit/RabbitController.java +++ b/squirrel.web/src/main/java/com/squirrel/rabbit/RabbitController.java @@ -29,7 +29,7 @@ public class RabbitController { public SquirrelWebObject observeFrontier(@RequestParam(value = "id", defaultValue = "n/a") String property, @RequestParam(value = "percent", defaultValue = "false") String percent) { SquirrelWebObject o; try { - int id = Boolean.parseBoolean(percent) ? (int) ((Integer.parseInt(property) / 100f) * Application.listenerThread.countSquirrelWebObjects()) : Integer.parseInt(property); + int id = Boolean.parseBoolean(percent) ? (int) ((Integer.parseInt(property) / 100f) * Application.listenerThread.countSquirrelWebObjects()) : Integer.parseInt(property); o = Application.listenerThread.getSquirrel(id); } catch (NumberFormatException e) { o = Application.listenerThread.getSquirrel(); diff --git a/squirrel.web/src/main/java/com/squirrel/rabbit/RabbitMQListener.java b/squirrel.web/src/main/java/com/squirrel/rabbit/RabbitMQListener.java index db9330f76..8aeced89c 100644 --- a/squirrel.web/src/main/java/com/squirrel/rabbit/RabbitMQListener.java +++ b/squirrel.web/src/main/java/com/squirrel/rabbit/RabbitMQListener.java @@ -1,28 +1,33 @@ package com.squirrel.rabbit; -import com.SquirrelWebObject; -import com.graph.VisualisationGraph; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; +import java.awt.Color; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; + +import javax.validation.constraints.NotNull; import org.apache.commons.io.IOUtils; import org.dice_research.squirrel.data.uri.CrawleableUriFactoryImpl; import org.dice_research.squirrel.data.uri.serialize.Serializer; import org.dice_research.squirrel.data.uri.serialize.java.GzipJavaUriSerializer; import org.dice_research.squirrel.rabbit.msgs.UriSet; -import org.hobbit.core.rabbit.*; +import org.hobbit.core.rabbit.DataHandler; +import org.hobbit.core.rabbit.DataReceiver; +import org.hobbit.core.rabbit.DataReceiverImpl; +import org.hobbit.core.rabbit.RabbitQueueFactory; +import org.hobbit.core.rabbit.RabbitQueueFactoryImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.validation.constraints.NotNull; -import java.awt.*; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeoutException; +import com.SquirrelWebObject; +import com.graph.VisualisationGraph; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; /** * The interface between the RabbitMQ and the Web-Service @@ -76,7 +81,7 @@ private boolean queueDeclare(String queueName) { try { channel.queueDeclare(queueName, false, false, false, null); } catch (IOException e) { - logger.error("I have a connection to " + connection.getClientProvidedName() + " with the channel number " + channel.getChannelNumber() + ", but I was not able to declare a queue :(", e); +// logger.error("I have a connection to " + connection.getClientProvidedName() + " with the channel number " + channel.getChannelNumber() + ", but I was not able to declare a queue :(", e); return false; } logger.info("Queue declaration succeeded with the name " + queueName + " [" + channel.getChannelNumber() + "]"); @@ -91,13 +96,15 @@ private boolean queueDeclare(String queueName) { * @return {@code true}, if the connection to the RabbitMQ was established, otherwise {@code false} */ private boolean rabbitConnect(int triesLeft) { + String host = System.getenv("HOST") == null ? "localhost" : System.getenv("HOST"); ConnectionFactory factory = new ConnectionFactory(); - factory.setHost("rabbit"); + factory.setHost(host); factory.setUsername("guest"); factory.setPassword("guest"); connection = null; try { connection = factory.newConnection(); + connection.getClientProperties(); channel = connection.createChannel(); } catch (IOException e) { if (triesLeft > 0) { @@ -129,7 +136,7 @@ private boolean rabbitConnect(int triesLeft) { } } - logger.info("Connection to rabbit succeeded: " + factory.getHost() + " - " + connection.getClientProvidedName() + " [" + connection.getId() + "]"); + logger.info("Connection to rabbit succeeded: " + factory.getHost()); return true; } diff --git a/squirrel.worker/pom.xml b/squirrel.worker/pom.xml index 6e20680b3..5326e9f4e 100644 --- a/squirrel.worker/pom.xml +++ b/squirrel.worker/pom.xml @@ -6,7 +6,7 @@ org.dice-research squirrel - 0.3.0-SNAPSHOT + 0.3.0 squirrel.worker jar diff --git a/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/HDTAnalyzer.java b/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/HDTAnalyzer.java index 1d74b5b04..4b8f864f9 100644 --- a/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/HDTAnalyzer.java +++ b/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/HDTAnalyzer.java @@ -14,7 +14,6 @@ import org.apache.tika.Tika; import org.dice_research.squirrel.Constants; import org.dice_research.squirrel.analyzer.AbstractAnalyzer; -import org.dice_research.squirrel.analyzer.Analyzer; import org.dice_research.squirrel.collect.UriCollector; import org.dice_research.squirrel.data.uri.CrawleableUri; import org.dice_research.squirrel.metadata.ActivityUtil; @@ -26,6 +25,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * + * Analyzer to parse HDTAnalyzer types + * + * + * + * @author gsjunior gsjunior@mail.uni-paderborn.de + */ + public class HDTAnalyzer extends AbstractAnalyzer { private static final Logger LOGGER = LoggerFactory.getLogger(HDTAnalyzer.class); diff --git a/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/ckan/CkanJsonAnalyzer.java b/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/ckan/CkanJsonAnalyzer.java index df4ce9b78..db6d588a9 100644 --- a/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/ckan/CkanJsonAnalyzer.java +++ b/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/ckan/CkanJsonAnalyzer.java @@ -9,6 +9,7 @@ import java.util.stream.Stream; import org.dice_research.squirrel.Constants; +import org.dice_research.squirrel.analyzer.AbstractAnalyzer; import org.dice_research.squirrel.analyzer.Analyzer; import org.dice_research.squirrel.collect.UriCollector; import org.dice_research.squirrel.data.uri.CrawleableUri; @@ -30,7 +31,7 @@ * @author Michael Röder (michael.roeder@uni-paderborn.de) * */ -public class CkanJsonAnalyzer implements Analyzer { +public class CkanJsonAnalyzer extends AbstractAnalyzer { private static final Logger LOGGER = LoggerFactory.getLogger(CkanJsonAnalyzer.class); @@ -38,6 +39,7 @@ public class CkanJsonAnalyzer implements Analyzer { protected ObjectMapper mapper; public CkanJsonAnalyzer(UriCollector collector) { + super(collector); this.collector = collector; mapper = new ObjectMapper(); mapper.registerModule(new JackanModule()); diff --git a/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/HTMLScraperAnalyzer.java b/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/HTMLScraperAnalyzer.java index ddcf0a92a..24ffd0724 100644 --- a/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/HTMLScraperAnalyzer.java +++ b/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/HTMLScraperAnalyzer.java @@ -45,7 +45,7 @@ public Iterator analyze(CrawleableUri curi, File data, Sink sink) { return collector.getUris(curi); } catch (Exception e) { - LOGGER.warn("Could not analyze file for URI: " + curi.getUri().toString() + " :: Analyzer: " + this.getClass().getName()); + LOGGER.warn("Could not analyze file for URI: " + curi.getUri().toString() + " :: Analyzer: " + this.getClass().getName(),e); } return null; } diff --git a/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/HtmlScraper.java b/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/HtmlScraper.java index 108268366..71c874dbb 100644 --- a/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/HtmlScraper.java +++ b/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/HtmlScraper.java @@ -11,14 +11,11 @@ import java.util.Map.Entry; import java.util.Set; import java.util.Stack; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.commons.lang.StringUtils; import org.apache.jena.graph.Node; import org.apache.jena.graph.NodeFactory; import org.apache.jena.graph.Triple; -import org.apache.jena.tdb.store.Hash; +import org.apache.jena.rdf.model.ResourceFactory; import org.dice_research.squirrel.analyzer.impl.html.scraper.exceptions.ElementNotFoundException; import org.dice_research.squirrel.data.uri.UriUtils; import org.jsoup.Jsoup; @@ -31,6 +28,10 @@ /** + * + * HTMLScraper to extract triples from HTML Data based in + * pre configured yaml files. + * * @author gsjunior */ public class HtmlScraper { @@ -44,6 +45,8 @@ public class HtmlScraper { private String uri; private String label; private Document doc; + private Map> staticMap = new HashMap>(); + private Map> selectedMap = new HashMap>(); public HtmlScraper(File file) { @@ -63,7 +66,8 @@ public HtmlScraper() { } - public List scrape(String uri, File filetToScrape) throws Exception { + @SuppressWarnings("unchecked") + public List scrape(String uri, File filetToScrape) throws Exception { List listTriples = new ArrayList(); listIterableObjects = new LinkedHashSet(); @@ -71,15 +75,50 @@ public List scrape(String uri, File filetToScrape) throws Exception { YamlFile yamlFile = (YamlFile) yamlFiles.get(UriUtils.getDomainName(uri)).clone(); - + this.uri = uri; + if(uri.contains("?")) { + this.label = uri.substring(uri.lastIndexOf("/")+1, uri.lastIndexOf("?")); + }else { + this.label = uri.substring(uri.lastIndexOf("/")+1, uri.length()); + } if((boolean) yamlFile.getFile_descriptor().get(YamlFileAtributes.SEARCH_CHECK).get("ignore-request") && uri.contains("?")) { - - uri = uri.substring(0, uri.indexOf("?")); - + label = uri.substring(uri.lastIndexOf("/")+1, uri.lastIndexOf("?")); + this.uri = uri.substring(0, uri.indexOf("?")); } if (yamlFile != null) { // yamlFile.getFile_descriptor().remove(YamlFileAtributes.SEARCH_CHECK); + + if(yamlFile.getFile_descriptor().get(YamlFileAtributes.SEARCH_CHECK).get("static-resources")!= null) { + + for(Entry entry: ((HashMap) yamlFile.getFile_descriptor().get(YamlFileAtributes.SEARCH_CHECK) + .get("static-resources")).entrySet()) { + for(Entry typesEntry: ((HashMap) entry.getValue()).entrySet() ) { + Node s = NodeFactory.createURI(typesEntry.getKey()); + List listTriple = new ArrayList(); + for(Entry valuesEntry: ((HashMap) typesEntry.getValue()).entrySet()) { + Node o; + Node p = NodeFactory.createURI(valuesEntry.getKey()); + + try { + new URL(valuesEntry.getValue().toString()); + o = NodeFactory.createURI(valuesEntry.getValue().toString()); + } catch (MalformedURLException e) { + o = NodeFactory.createLiteral(valuesEntry.getValue().toString()); + } + + Triple t = new Triple(s, p, o); + listTriple.add(t); + } + staticMap.put(entry.getKey().toLowerCase(), listTriple); + } + } + +// for(Object entry : yamlFile.getFile_descriptor().get(YamlFileAtributes.SEARCH_CHECK).get("static-resources")) { +// +// } + } + for (Entry> entry : yamlFile.getFile_descriptor().entrySet()) { for (Entry cfg : entry.getValue().entrySet()) { @@ -201,8 +240,12 @@ private Set scrapeDownloadLink(Map resources, File htmlF List resourcesList = new ArrayList(); - this.uri = uri; - this.label = uri.substring(uri.lastIndexOf("/")+1, uri.length()); + +// if(uri.contains("?")) { +// this.label = uri.substring(uri.lastIndexOf("/")+1, uri.lastIndexOf("?")); +// }else { +// this.label = uri.substring(uri.lastIndexOf("/")+1, uri.length()); +// } for (Entry entry : resources.entrySet()) { @@ -217,6 +260,12 @@ private Set scrapeDownloadLink(Map resources, File htmlF } } + + if(!selectedMap.isEmpty()) { + for(Entry> entry : selectedMap.entrySet()) { + triples.addAll(entry.getValue()); + } + } return triples; } @@ -253,7 +302,7 @@ private Set scrapeTree(Map mapEntry,Set triples triples.addAll(scrapeTree((Map )entry.getValue(),triples,stackNode)); }else if(entry.getValue() instanceof String) { - Node p = NodeFactory.createURI(entry.getKey()); + Node p = ResourceFactory.createResource(entry.getKey()).asNode(); List o = jsoupQuery((String) entry.getValue()); if (o.isEmpty()) { LOGGER.warn("Element "+ entry.getKey() + ": " + entry.getValue() + " not found or does not exist"); @@ -281,12 +330,15 @@ private List jsoupQuery(String cssQuery) throws MalformedURLException { List listNodes = new ArrayList(); + @SuppressWarnings("unused") + boolean useResource = false; + try { - if(cssQuery.startsWith("l")) { + if(cssQuery.startsWith("l(")) { String val = cssQuery.substring(cssQuery.indexOf("(")+1,cssQuery.lastIndexOf(")")); - String label = uri.substring(uri.lastIndexOf("/")+1, uri.length()); +// String label = uri.substring(uri.lastIndexOf("/")+1, uri.lastIndexOf("?")); if (val.contains("$uri")) { val = val.replaceAll("\\$uri", uri); @@ -303,7 +355,12 @@ private List jsoupQuery(String cssQuery) throws MalformedURLException { arrayElements[0] = el; elements = new Elements(arrayElements); }else { - elements = doc.select(cssQuery); + + if(cssQuery.startsWith("res(")) { + useResource = true; + cssQuery = cssQuery.substring(cssQuery.indexOf("(")+1, cssQuery.lastIndexOf(")") ); + } + elements = doc.select(cssQuery); if (elements.isEmpty()) { throw new ElementNotFoundException("Element (" + cssQuery + ")" @@ -324,9 +381,12 @@ private List jsoupQuery(String cssQuery) throws MalformedURLException { String path = elements.get(i).attr("href"); String base = url.getProtocol() + "://" + url.getHost() + path; listNodes.add(NodeFactory.createURI(base)); - } else { + }else { listNodes.add(NodeFactory.createURI(elements.get(i).attr("abs:href"))); } + }else if(useResource) { + listNodes.add(staticMap.get(elements.get(i).text().toLowerCase()).get(0).getSubject()); + selectedMap.put(elements.get(i).text().toLowerCase(), staticMap.get(elements.get(i).text().toLowerCase())); } else { boolean uriFlag = true; diff --git a/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/YamlFilesParser.java b/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/YamlFilesParser.java index c5ec5848a..680df176b 100644 --- a/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/YamlFilesParser.java +++ b/squirrel.worker/src/main/java/org/dice_research/squirrel/analyzer/impl/html/scraper/YamlFilesParser.java @@ -18,6 +18,8 @@ import java.util.stream.Collectors; /** + * Parser for Yaml Files, for the HTML Scraper + * * @author gsjunior */ public class YamlFilesParser { diff --git a/squirrel.worker/src/main/java/org/dice_research/squirrel/components/WorkerComponent.java b/squirrel.worker/src/main/java/org/dice_research/squirrel/components/WorkerComponent.java index 083c682ce..83db57467 100644 --- a/squirrel.worker/src/main/java/org/dice_research/squirrel/components/WorkerComponent.java +++ b/squirrel.worker/src/main/java/org/dice_research/squirrel/components/WorkerComponent.java @@ -67,12 +67,8 @@ public class WorkerComponent extends AbstractComponent implements Frontier { @Override public void init() throws Exception { super.init(); - if (worker == null || sender == null || client == null || serializer == null) { - LOGGER.warn("The SPRING-config autowire service was not (totally) working. We must do the instantiation in the WorkerComponent!"); - initWithoutSpring(); - } - UriSetRequest uriSetReq = new UriSetRequest(worker.getUri(),false); + UriSetRequest uriSetReq = new UriSetRequest(worker.getId(),false); uriSetRequest = serializer.serialize(uriSetReq); @@ -93,7 +89,7 @@ public void init() throws Exception { @Override public void run() { try { - senderFrontier.sendData(serializer.serialize(new AliveMessage(worker.getUri()))); + senderFrontier.sendData(serializer.serialize(new AliveMessage((worker.getId())))); } catch (IOException e) { LOGGER.warn(e.toString()); } @@ -106,25 +102,25 @@ public void run() { } - private void initWithoutSpring() throws Exception { - super.init(); - - WorkerConfiguration workerConfiguration = WorkerConfiguration.getWorkerConfiguration(); - - Sink sink; - if (workerConfiguration.getSparqlHost() == null || workerConfiguration.getSqarqlPort() == null) { - sink = new FileBasedSink(new File(workerConfiguration.getOutputFolder()), true); - } else { - String httpPrefix = "http://" + workerConfiguration.getSparqlHost() + ":" + workerConfiguration.getSqarqlPort() + "/"; - sink = new SparqlBasedSink(workerConfiguration.getSparqlHost().toString(), workerConfiguration.getSqarqlPort().toString(), "contentset/update", "contentset/query", "MetaData/update", "MetaData/query"); - } - - serializer = new GzipJavaUriSerializer(); - -// worker = new WorkerImpl(this, sink, new RobotsManagerImpl(new SimpleHttpFetcher(new UserAgent(Constants.DEFAULT_USER_AGENT, "", ""))), serializer, SqlBasedUriCollector.create(serializer), 2000, workerConfiguration.getOutputFolder() + File.separator + "log", true); - sender = DataSenderImpl.builder().queue(outgoingDataQueuefactory, Constants.FRONTIER_QUEUE_NAME).build(); - client = RabbitRpcClient.create(outgoingDataQueuefactory.getConnection(), Constants.FRONTIER_QUEUE_NAME); - } +// private void initWithoutSpring() throws Exception { +// super.init(); +// +// WorkerConfiguration workerConfiguration = WorkerConfiguration.getWorkerConfiguration(); +// +// Sink sink; +// if (workerConfiguration.getSparqlHost() == null || workerConfiguration.getSqarqlPort() == null) { +// sink = new FileBasedSink(new File(workerConfiguration.getOutputFolder()), true); +// } else { +// String httpPrefix = "http://" + workerConfiguration.getSparqlHost() + ":" + workerConfiguration.getSqarqlPort() + "/"; +// sink = new SparqlBasedSink(workerConfiguration.getSparqlHost().toString(), workerConfiguration.getSqarqlPort().toString(), "contentset/update", "contentset/query", "MetaData/update", "MetaData/query"); +// } +// +// serializer = new GzipJavaUriSerializer(); +// +//// worker = new WorkerImpl(this, sink, new RobotsManagerImpl(new SimpleHttpFetcher(new UserAgent(Constants.DEFAULT_USER_AGENT, "", ""))), serializer, SqlBasedUriCollector.create(serializer), 2000, workerConfiguration.getOutputFolder() + File.separator + "log", true); +// sender = DataSenderImpl.builder().queue(outgoingDataQueuefactory, Constants.FRONTIER_QUEUE_NAME).build(); +// client = RabbitRpcClient.create(outgoingDataQueuefactory.getConnection(), Constants.FRONTIER_QUEUE_NAME); +// } @Override public void run() { diff --git a/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/ckan/java/SimpleCkanFetcher.java b/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/ckan/java/SimpleCkanFetcher.java index f2ea0ecaa..afe8fb097 100644 --- a/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/ckan/java/SimpleCkanFetcher.java +++ b/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/ckan/java/SimpleCkanFetcher.java @@ -40,7 +40,7 @@ public class SimpleCkanFetcher implements Fetcher { public static final String CKAN_JSON_OBJECT_MIME_TYPE = "ckan/json"; public static final byte NEWLINE_CHAR = '\n'; - protected boolean checkForUriType = true; + protected boolean checkForUriType = false; protected File dataDirectory = FileUtils.getTempDirectory(); protected ObjectMapper mapper; @@ -58,7 +58,9 @@ public void close() throws IOException { @Override public File fetch(CrawleableUri uri) { // If this is a CKAN API URI or we do not check it at all + LOGGER.info("Starting Ckanfetcher..."); if(!checkForUriType || CKAN_API_URI_TYPE_VALUE.equals(uri.getData(Constants.URI_TYPE_KEY))) { + LOGGER.info("Fetching " + uri.getUri().toString()); CkanClient client = null; OutputStream out = null; try { @@ -73,6 +75,7 @@ public File fetch(CrawleableUri uri) { // If we reached this point, we should add a flag that the file contains CKAN JSON uri.addData(Constants.URI_HTTP_MIME_TYPE_KEY, CKAN_JSON_OBJECT_MIME_TYPE); ActivityUtil.addStep(uri, getClass()); + uri.addData(Constants.URI_HTTP_MIME_TYPE_KEY,"CKAN_API"); return dataFile; } catch(CkanException e) { LOGGER.info("The given URI does not seem to be a CKAN URI. Returning null. Exception: " + e.getMessage()); diff --git a/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/manage/SimpleOrderedFetcherManager.java b/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/manage/SimpleOrderedFetcherManager.java index 9614f2626..19da49209 100644 --- a/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/manage/SimpleOrderedFetcherManager.java +++ b/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/manage/SimpleOrderedFetcherManager.java @@ -27,7 +27,6 @@ public class SimpleOrderedFetcherManager implements Fetcher { private Fetcher[] fetchers; - @Autowired public SimpleOrderedFetcherManager(Fetcher... fetchers) { this.fetchers = fetchers; } diff --git a/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/sparql/SparqlBasedFetcher.java b/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/sparql/SparqlBasedFetcher.java index 566dc8042..901fad115 100644 --- a/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/sparql/SparqlBasedFetcher.java +++ b/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/sparql/SparqlBasedFetcher.java @@ -38,7 +38,7 @@ public class SparqlBasedFetcher implements Fetcher { /** * The delay that the system will have between sending two queries. */ - private static final int DELAY = 5000; + private static final int DELAY = 1000; private static final String SELECT_ALL_TRIPLES_QUERY = "SELECT ?s ?p ?o {?s ?p ?o}"; @@ -92,7 +92,8 @@ protected QueryExecutionFactory initQueryExecution(String uri) throws ClassNotFo qef = new QueryExecutionFactoryHttp(uri); qef = new QueryExecutionFactoryDelay(qef, DELAY); try { - return new QueryExecutionFactoryPaginated(qef, 100); + LOGGER.info("Starting to Query uri:" + uri); + return new QueryExecutionFactoryPaginated(qef, 1000); } catch (Exception e) { LOGGER.info("Couldn't create Factory with pagination. Returning Factory without pagination. Exception: {}", e.getLocalizedMessage()); diff --git a/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/sparql/SparqlDatasetFetcher.java b/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/sparql/SparqlDatasetFetcher.java new file mode 100644 index 000000000..71569b13f --- /dev/null +++ b/squirrel.worker/src/main/java/org/dice_research/squirrel/fetcher/sparql/SparqlDatasetFetcher.java @@ -0,0 +1,215 @@ +package org.dice_research.squirrel.fetcher.sparql; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.SQLException; +import java.util.Iterator; + +import org.aksw.jena_sparql_api.core.QueryExecutionFactory; +import org.aksw.jena_sparql_api.delay.core.QueryExecutionFactoryDelay; +import org.aksw.jena_sparql_api.http.QueryExecutionFactoryHttp; +import org.aksw.jena_sparql_api.pagination.core.QueryExecutionFactoryPaginated; +import org.apache.commons.io.FileUtils; +import org.apache.jena.graph.Triple; +import org.apache.jena.query.Query; +import org.apache.jena.query.QueryExecution; +import org.apache.jena.query.QueryFactory; +import org.apache.jena.query.QuerySolution; +import org.apache.jena.query.ResultSet; +import org.apache.jena.riot.RDFDataMgr; +import org.apache.jena.sparql.engine.http.QueryExceptionHTTP; +import org.apache.tika.io.IOUtils; +import org.dice_research.squirrel.Constants; +import org.dice_research.squirrel.data.uri.CrawleableUri; +import org.dice_research.squirrel.fetcher.Fetcher; +import org.dice_research.squirrel.metadata.ActivityUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * A simple {@link Fetcher} for SPARQL that tries to get DataSets from a SPARQL + * endpoint using the query {@value #DATA_SET_QUERY}. + * + * @author Geraldo de Souza Jr (gsjunior@uni-paderborn.de) + * + */ +@Component +public class SparqlDatasetFetcher implements Fetcher { + + private static final Logger LOGGER = LoggerFactory.getLogger(SparqlDatasetFetcher.class); + + /** + * The delay that the system will have between sending two queries. + */ + + protected String dataSetQuery = "select ?s where {?s a .} " ; + protected String graphQuery = + "construct { ?s ?p ?o. " + + "?o ?p2 ?o2. } " + + "where { " + + "graph ?g { " + + "?s ?p ?o. " + + "OPTIONAL { ?o ?p2 ?o2.} " + + "} " + + "}"; + + + + protected int delay; + protected int limit = 0; + protected File dataDirectory = FileUtils.getTempDirectory(); + + public SparqlDatasetFetcher() { + + } + + + public SparqlDatasetFetcher(int delay, int begin, int limit) { + this.delay = delay; + dataSetQuery += "OFFSET " + begin; + this.limit = limit; + + } + + public SparqlDatasetFetcher(int delay){ + this.delay = delay; + } + + @Override + public File fetch(CrawleableUri uri) { + // Check whether we can be sure that it is a SPARQL endpoint + boolean shouldBeSparql = Constants.URI_TYPE_VALUE_SPARQL.equals(uri.getData(Constants.URI_TYPE_KEY)); + QueryExecutionFactory qef = null; + QueryExecution execution = null; + File dataFile = null; + OutputStream out = null; + try { + // Create query execution instance + qef = initQueryExecution(uri.getUri().toString()); + // create temporary file + try { + dataFile = File.createTempFile("fetched_", "", dataDirectory); + out = new BufferedOutputStream(new FileOutputStream(dataFile)); + } catch (IOException e) { + LOGGER.error("Couldn't create temporary file for storing fetched data. Returning null.", e); + return null; + } + + execution = qef.createQueryExecution(dataSetQuery); + ResultSet resultSet = execution.execSelect(); + + int i = 0; + + while(resultSet.hasNext()) { + + if(limit != 0 && i>limit) { + LOGGER.info("LIMIT REACHED, STOPING EXECUTION"); + execution.close(); + break; + } + + + QuerySolution soln = resultSet.nextSolution() ; + String dataSetResource = soln.get("s").toString() ; // Get a result variable by name. + LOGGER.info("- Now Fetching - " + i + ": " + dataSetResource); + + Query query = QueryFactory.create(graphQuery.replaceAll("\\?s", "<" + dataSetResource + ">")) ; + + + boolean tryAgain = true; + + while(tryAgain) { + + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + LOGGER.error("An error occurred when fetching URI: " + uri.getUri().toString() + ,e); + } + + try { + QueryExecution qexecGraph = org.apache.jena.query.QueryExecutionFactory.createServiceRequest(uri.getUri().toString(), query); + + Iterator triples = qexecGraph.execConstructTriples(); + + RDFDataMgr.writeTriples(out, new SelectedTriplesIterator(triples)); + tryAgain = false; + + i++; + }catch(QueryExceptionHTTP e) { + + if(e.getResponseCode() == 404 || e.getResponseCode() == 500) { + tryAgain = true; + LOGGER.info("Error while fetching " +dataSetResource + ". Trying again..."); + } + + } + } + } + + +// RDFDataMgr.writeTriples(out, new SelectedTriplesIterator(resultSet)); + } catch (Throwable e) { + // If this should have worked, print a message, otherwise silently return null + if (shouldBeSparql) { + LOGGER.error("Couldn't create QueryExecutionFactory for \"" + uri.getUri() + "\". Returning -1."); + ActivityUtil.addStep(uri, getClass(), e.getMessage()); + } + return null; + } finally { + IOUtils.closeQuietly(out); + if (execution != null) { + execution.close(); + } + if (qef != null) { + qef.close(); + } + } + ActivityUtil.addStep(uri, getClass()); + return dataFile; + } + + protected QueryExecutionFactory initQueryExecution(String uri) throws ClassNotFoundException, SQLException { + QueryExecutionFactory qef; + qef = new QueryExecutionFactoryHttp(uri); + qef = new QueryExecutionFactoryDelay(qef, delay); + try { + LOGGER.info("Starting to Query uri:" + uri); + return new QueryExecutionFactoryPaginated(qef, 2000); + } catch (Exception e) { + LOGGER.info("Couldn't create Factory with pagination. Returning Factory without pagination. Exception: {}", + e.getLocalizedMessage()); + return qef; + } + } + + @Override + public void close() throws IOException { + // nothing to do + } + + protected static class SelectedTriplesIterator implements Iterator { + private Iterator triples; + + public SelectedTriplesIterator(Iterator triples) { + this.triples = triples; + } + + @Override + public boolean hasNext() { + return triples.hasNext(); + } + + @Override + public Triple next() { + return triples.next(); + } + + } + + +} diff --git a/squirrel.worker/src/main/java/org/dice_research/squirrel/worker/impl/WorkerImpl.java b/squirrel.worker/src/main/java/org/dice_research/squirrel/worker/impl/WorkerImpl.java index 0876b1f37..9c8b5a4bd 100644 --- a/squirrel.worker/src/main/java/org/dice_research/squirrel/worker/impl/WorkerImpl.java +++ b/squirrel.worker/src/main/java/org/dice_research/squirrel/worker/impl/WorkerImpl.java @@ -18,11 +18,6 @@ import org.dice_research.squirrel.data.uri.CrawleableUri; import org.dice_research.squirrel.data.uri.serialize.Serializer; import org.dice_research.squirrel.fetcher.Fetcher; -import org.dice_research.squirrel.fetcher.ckan.java.SimpleCkanFetcher; -import org.dice_research.squirrel.fetcher.ftp.FTPFetcher; -import org.dice_research.squirrel.fetcher.http.HTTPFetcher; -import org.dice_research.squirrel.fetcher.manage.SimpleOrderedFetcherManager; -import org.dice_research.squirrel.fetcher.sparql.SparqlBasedFetcher; import org.dice_research.squirrel.frontier.Frontier; import org.dice_research.squirrel.metadata.CrawlingActivity; import org.dice_research.squirrel.metadata.CrawlingActivity.CrawlingURIState; @@ -35,7 +30,6 @@ import org.dice_research.squirrel.worker.Worker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; /** * Standard implementation of the {@link Worker} interface. @@ -102,10 +96,11 @@ public class WorkerImpl implements Worker, Closeable { * The directory to which a domain log will be written (or * {@code null} if no log should be written). */ - public WorkerImpl(Frontier frontier, Sink sink,Analyzer analyzer, RobotsManager manager, Serializer serializer, + public WorkerImpl(Frontier frontier,Fetcher fetcher, Sink sink,Analyzer analyzer, RobotsManager manager, Serializer serializer, UriCollector collector, long waitingTime, String logDir, boolean sendAliveMessages) { this.frontier = frontier; this.sink = sink; + this.fetcher = fetcher; this.analyzer = analyzer; this.manager = manager; this.serializer = serializer; @@ -123,9 +118,9 @@ public WorkerImpl(Frontier frontier, Sink sink,Analyzer analyzer, RobotsManager } } this.collector = collector; - fetcher = new SimpleOrderedFetcherManager( - // new SparqlBasedFetcher(), - new HTTPFetcher(), new SimpleCkanFetcher(), new FTPFetcher(), new SparqlBasedFetcher()); +// fetcher = new SimpleOrderedFetcherManager( +// // new SparqlBasedFetcher(), +// new SparqlBasedFetcher(), new SimpleCkanFetcher(), new FTPFetcher(),new HTTPFetcher()); analyzer = new SimpleOrderedAnalyzerManager(collector); } @@ -220,6 +215,7 @@ public void performCrawling(CrawleableUri uri) { uri.addData(Constants.UUID_KEY, UUID.randomUUID().toString()); CrawlingActivity activity = new CrawlingActivity(uri, getUri()); uri.addData(Constants.URI_CRAWLING_ACTIVITY, activity); + try { // Check robots.txt if (manager.isUriCrawlable(uri.getUri())) { @@ -260,12 +256,17 @@ public void performCrawling(CrawleableUri uri) { sink.openSinkForUri(uri); collector.openSinkForUri(uri); // Go over all files and analyze them + for (File data : fetchedFiles) { if (data != null) { fileList = fm.decompressFile(data); + LOGGER.info("Found " + fileList + " files after decompression "); + int cont = 1; for (File file : fileList) { + LOGGER.info("Analyzing file " + cont + " of " + fileList.size()); Iterator resultUris = analyzer.analyze(uri, file, sink); sendNewUris(resultUris); + cont++; } } } @@ -295,8 +296,10 @@ public void performCrawling(CrawleableUri uri) { // LOGGER.debug("Fetched {} triples", count); setSpecificRecrawlTime(uri); - // Remove the activity since we don't want to send it back to the Frontier - uri.getData().remove(Constants.URI_CRAWLING_ACTIVITY); + } finally { + // Remove the activity since we don't want to send it back to the Frontier + uri.getData().remove(Constants.URI_CRAWLING_ACTIVITY); + } // TODO (this is only a unsatisfying quick fix to avoid unreadable graphs // because of too much nodes) @@ -358,4 +361,9 @@ public void setTerminateFlag(boolean terminateFlag) { this.terminateFlag = terminateFlag; } + @Override + public int getId() { + return this.id; + } + } \ No newline at end of file