Skip to content
This repository has been archived by the owner on Oct 16, 2024. It is now read-only.

Commit

Permalink
ISSUE #211: Support listing logs by prefix
Browse files Browse the repository at this point in the history
Descriptions of the changes in this PR:

- extend `getLogs` to `getLogs(prefix)`, so it provides a filesystem `listFiles`-like semantic.

Author: Sijie Guo <[email protected]>

Reviewers: Jia Zhai <None>

This closes #212 from sijie/5_support_list_logs_by_prefix_pr, closes #211
  • Loading branch information
sijie committed Oct 18, 2017
1 parent 442e000 commit 3610f0f
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,14 @@ public boolean logExists(String logName)
@Override
public Iterator<String> getLogs() throws IOException {
checkState();
return Utils.ioResult(driver.getLogMetadataStore().getLogs());
return Utils.ioResult(driver.getLogMetadataStore().getLogs(""));
}

@Override
public Iterator<String> getLogs(String logNamePrefix) throws IOException {
checkState();
logNamePrefix = validateAndNormalizeName(logNamePrefix);
return Utils.ioResult(driver.getLogMetadataStore().getLogs(logNamePrefix));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ boolean logExists(String logName)
Iterator<String> getLogs()
throws IOException;

/**
* Retrieve the logs under a given <i>logNamePrefix</i>.
*
* @param logNamePrefix log name prefix
* @return iterator of the logs under the log name prefix
* @throws IOException when encountered issues with backend.
*/
Iterator<String> getLogs(String logNamePrefix)
throws IOException;

//
// Methods for namespace
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.callback.NamespaceListener;
Expand Down Expand Up @@ -70,9 +71,14 @@ public CompletableFuture<Optional<URI>> getLogLocation(String logName) {
}

@Override
public CompletableFuture<Iterator<String>> getLogs() {
public CompletableFuture<Iterator<String>> getLogs(String logNamePrefix) {
final CompletableFuture<Iterator<String>> promise = new CompletableFuture<Iterator<String>>();
final String nsRootPath = namespace.getPath();
final String nsRootPath;
if (StringUtils.isEmpty(logNamePrefix)) {
nsRootPath = namespace.getPath();
} else {
nsRootPath = namespace.getPath() + "/" + logNamePrefix;
}
try {
final ZooKeeper zk = zkc.get();
zk.sync(nsRootPath, new AsyncCallback.VoidCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,12 @@ public void processResult(int rc, String path, Object ctx, Stat stat) {
}

@Override
public CompletableFuture<Iterator<String>> getLogs() {
public CompletableFuture<Iterator<String>> getLogs(String logNamePrefix) {
if (!"".equals(logNamePrefix)) {
return FutureUtils.exception(
new UnexpectedException("Get logs by prefix is not supported by federated metadata store"));
}

if (duplicatedLogFound.get()) {
return duplicatedLogException(duplicatedLogName.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ public interface LogMetadataStore {
/**
* Retrieves logs from the namespace.
*
* @param logNamePrefix
* log name prefix.
* @return iterator of logs of the namespace.
*/
CompletableFuture<Iterator<String>> getLogs();
CompletableFuture<Iterator<String>> getLogs(String logNamePrefix);

/**
* Register a namespace listener on streams changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.distributedlog.impl;

import static org.junit.Assert.*;

import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import java.net.URI;
Expand All @@ -36,8 +37,6 @@
import org.junit.Test;
import org.junit.rules.TestName;



/**
* Test ZK based metadata store.
*/
Expand Down Expand Up @@ -106,7 +105,20 @@ public void testGetLogs() throws Exception {
logs.add(logName);
createLogInNamespace(uri, logName);
}
Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs()));
Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs("")));
assertEquals(10, result.size());
assertTrue(Sets.difference(logs, result).isEmpty());
}

@Test(timeout = 60000)
public void testGetLogsPrefix() throws Exception {
Set<String> logs = Sets.newHashSet();
for (int i = 0; i < 10; i++) {
String logName = "test-" + i;
logs.add(logName);
createLogInNamespace(uri, "test/" + logName);
}
Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs("test")));
assertEquals(10, result.size());
assertTrue(Sets.difference(logs, result).isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void testBasicOperations() throws Exception {
assertEquals(logName, logsIter.next());
assertFalse(logsIter.hasNext());
// get logs should return the log
Iterator<String> newLogsIter = Utils.ioResult(metadataStore.getLogs());
Iterator<String> newLogsIter = Utils.ioResult(metadataStore.getLogs(""));
assertTrue(newLogsIter.hasNext());
assertEquals(logName, newLogsIter.next());
assertFalse(newLogsIter.hasNext());
Expand Down Expand Up @@ -274,7 +274,7 @@ public void testDuplicatedLogs() throws Exception {
assertTrue(metadataStore.duplicatedLogFound.get());
}
try {
Utils.ioResult(metadataStore.getLogs());
Utils.ioResult(metadataStore.getLogs(""));
fail("should throw exception when duplicated log found");
} catch (UnexpectedException ue) {
// should throw unexpected exception
Expand Down Expand Up @@ -338,7 +338,7 @@ public void testGetLogs() throws Exception {
do {
TimeUnit.MILLISECONDS.sleep(20);
receivedLogs = new TreeSet<String>();
Iterator<String> logs = Utils.ioResult(metadataStore.getLogs());
Iterator<String> logs = Utils.ioResult(metadataStore.getLogs(""));
receivedLogs.addAll(Lists.newArrayList(logs));
} while (receivedLogs.size() < numLogs);
assertEquals(numLogs, receivedLogs.size());
Expand Down Expand Up @@ -387,7 +387,7 @@ public void testCreateLogPickingFirstAvailableSubNamespace() throws Exception {
do {
TimeUnit.MILLISECONDS.sleep(20);
receivedLogs = new TreeSet<String>();
Iterator<String> logs = Utils.ioResult(metadataStore.getLogs());
Iterator<String> logs = Utils.ioResult(metadataStore.getLogs(""));
receivedLogs.addAll(Lists.newArrayList(logs));
} while (receivedLogs.size() < 3 * maxLogsPerSubnamespace - 1);

Expand Down

0 comments on commit 3610f0f

Please sign in to comment.