Skip to content

Commit

Permalink
Add meta commands and implement :help and :expire
Browse files Browse the repository at this point in the history
  • Loading branch information
kawamuray committed Nov 2, 2020
1 parent 8383a61 commit 14b5328
Show file tree
Hide file tree
Showing 12 changed files with 408 additions and 5 deletions.
23 changes: 23 additions & 0 deletions src/main/java/kmql/Command.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kmql;

import java.io.BufferedOutputStream;
import java.util.List;

/**
* An interface of the kmql meta commands.
*/
public interface Command {
/**
* Return the help string of this command.
* @return help string of this command.
*/
String help();

/**
* Execute this command.
* @param args command arguments.
* @param engine a {@link Engine} of the executing context.
* @param output output stream to write any outputs.
*/
void execute(List<String> args, Engine engine, BufferedOutputStream output);
}
66 changes: 66 additions & 0 deletions src/main/java/kmql/CommandRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package kmql;

import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import kmql.command.ExpireCommand;
import kmql.command.HelpCommand;

/**
* Registry of meta commands.
*/
public class CommandRegistry implements Iterable<Entry<String, Command>> {
public static final CommandRegistry DEFAULT = new CommandRegistry();

static {
registerDefault("help", new HelpCommand());
registerDefault("expire", new ExpireCommand());
}

private final ConcurrentMap<String, Command> commands;

public CommandRegistry() {
commands = new ConcurrentHashMap<>();
}

/**
* Register the given command under the given name to the default registry.
* @param name the name of the command.
* @param command the command instance.
*/
public static void registerDefault(String name, Command command) {
DEFAULT.register(name, command);
}

/**
* Register the given command under the given name.
* @param name the name of the command.
* @param command the command instance.
*/
public void register(String name, Command command) {
if (commands.putIfAbsent(name, command) != null) {
throw new IllegalArgumentException("conflicting command name: " + name);
}
}

/**
* Lookup a command by the name.
* @param name the name of the command.
* @return an {@link Command} if presents.
*/
public Optional<Command> lookup(String name) {
return Optional.ofNullable(commands.get(name));
}

/**
* Returns the iterator over command entries registered in this registry.
* @return an iterator for command name and command instances.
*/
@Override
public Iterator<Entry<String, Command>> iterator() {
return commands.entrySet().iterator();
}
}
44 changes: 44 additions & 0 deletions src/main/java/kmql/Commands.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package kmql;

import java.io.BufferedOutputStream;
import java.util.Arrays;
import java.util.List;

/**
* Entrypoint to execute meta commands.
*/
public class Commands {
public static final String COMMAND_PREFIX = ":";

private Commands() {}

/**
* Return true if the given line is intending to execute meta command.
* @param line a line to test.
* @return true if the given line is a meta command line.
*/
public static boolean isCommand(String line) {
return line.trim().startsWith(COMMAND_PREFIX);
}

/**
* Execute the given line by parsing, looking up and executing command.
* @param engine an {@link Engine} of executing context.
* @param output a stream to write command output.
* @param line meta command line.
*/
public static void executeLine(Engine engine, BufferedOutputStream output, String line) {
List<String> cmdline = parseLine(line);
Command command = engine.commandRegistry().lookup(cmdline.get(0)).orElseThrow(
() -> new IllegalArgumentException("no such command: " + cmdline.get(0)));
command.execute(cmdline.subList(1, cmdline.size()), engine, output);
}

private static List<String> parseLine(String line) {
List<String> cmdline = Arrays.asList(line.split("\\s+"));
String cmd = cmdline.get(0);
String commandName = cmd.substring(COMMAND_PREFIX.length()).trim();
cmdline.set(0, commandName);
return cmdline;
}
}
14 changes: 14 additions & 0 deletions src/main/java/kmql/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ public void dropTable(String name) throws SQLException {
meta.initialized = false;
}

/**
* Drop all tables.
* @throws SQLException when SQL failed.
*/
public void dropAllTables() throws SQLException {
for (Entry<String, TableMetadata> entry : tables.entrySet()) {
String name = entry.getKey();
TableMetadata meta = entry.getValue();
if (meta.initialized) {
dropTable(name);
}
}
}

/**
* Execute the given query and call the given handler with the {@link ResultSet}.
* @param sql an SQL query.
Expand Down
15 changes: 12 additions & 3 deletions src/main/java/kmql/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@
import org.apache.kafka.clients.admin.AdminClient;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.experimental.Accessors;

/**
* A core runtime of kmql.
*/
@AllArgsConstructor
@Accessors(fluent = true)
@Getter
public class Engine implements AutoCloseable {
private final AdminClient adminClient;
private final Database db;
private final OutputFormatRegistry outputFormatRegistry;
private final CommandRegistry commandRegistry;
@NonNull
private OutputFormat outputFormat;

Expand All @@ -29,10 +34,9 @@ public class Engine implements AutoCloseable {
* @return an {@link Engine}.
*/
public static Engine from(AdminClient adminClient, String outputFormatName) {
OutputFormatRegistry outputFormatRegistry = OutputFormatRegistry.DEFAULT;
OutputFormat outputFormat = lookupOutputFormat(outputFormatRegistry, outputFormatName);
OutputFormat outputFormat = lookupOutputFormat(OutputFormatRegistry.DEFAULT, outputFormatName);
Database db = Database.from(TableRegistry.DEFAULT);
return new Engine(adminClient, db, outputFormatRegistry, outputFormat);
return new Engine(adminClient, db, OutputFormatRegistry.DEFAULT, CommandRegistry.DEFAULT, outputFormat);
}

/**
Expand All @@ -43,6 +47,11 @@ public static Engine from(AdminClient adminClient, String outputFormatName) {
* @throws SQLException when SQL fails.
*/
public void execute(String command, BufferedOutputStream output) throws SQLException {
if (Commands.isCommand(command)) {
Commands.executeLine(this, output, command);
return;
}

prepareRequiredTables(command);
db.executeQuery(command, results -> {
try {
Expand Down
39 changes: 39 additions & 0 deletions src/main/java/kmql/command/ExpireCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package kmql.command;

import java.io.BufferedOutputStream;
import java.io.PrintWriter;
import java.util.List;

import kmql.Command;
import kmql.Engine;

/**
* Expire table caches.
*/
public class ExpireCommand implements Command {
@Override
public String help() {
return ":expire - Expire all initialized tables (tables are re-created when next time they're queried)\n"
+
":expire TABLE1[ TABLE2...] - Expire specified tables";
}

@Override
public void execute(List<String> args, Engine engine, BufferedOutputStream output) {
PrintWriter pw = new PrintWriter(output);
try {
if (args.isEmpty()) {
pw.println("Expiring ALL tables...");
engine.db().dropAllTables();
} else {
for (String table : args) {
pw.printf("Expiring table %s...\n", table);
engine.db().dropTable(table);
}
}
} catch (Exception e) {
pw.println("Failed to drop table: " + e.getMessage());
}
pw.flush();
}
}
38 changes: 38 additions & 0 deletions src/main/java/kmql/command/HelpCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package kmql.command;

import java.io.BufferedOutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;

import kmql.Command;
import kmql.Engine;

/**
* Show help for interactive console.
*/
public class HelpCommand implements Command {
@Override
public String help() {
return ":help - Show this help";
}

@Override
public void execute(List<String> args, Engine engine, BufferedOutputStream output) {
PrintWriter pw = new PrintWriter(output);
pw.println("Execute SQL:");
pw.println(" SELECT * FROM $table WHERE condA = x LIMIT 3;");
pw.println("Meta commands:");
List<String> helpLines = new ArrayList<>();
for (Entry<String, Command> entry : engine.commandRegistry()) {
helpLines.add(entry.getValue().help());
}
Collections.sort(helpLines);
for (String helpLine : helpLines) {
pw.println(helpLine.trim());
}
pw.flush();
}
}
24 changes: 24 additions & 0 deletions src/test/java/kmql/CommandRegistryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package kmql;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;

import org.junit.Test;

public class CommandRegistryTest {
private final CommandRegistry registry = new CommandRegistry();

@Test
public void register() {
Command command = mock(Command.class);
registry.register("xyz", command);
assertSame(command, registry.lookup("xyz").get());
}

@Test(expected = IllegalArgumentException.class)
public void registerTwice() {
Command command = mock(Command.class);
registry.register("xyz", command);
registry.register("xyz", command);
}
}
56 changes: 56 additions & 0 deletions src/test/java/kmql/CommandsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package kmql;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;

import org.junit.Test;

public class CommandsTest {
private final Command xyzCommand = new Command() {
@Override
public String help() {
return ":xyz";
}

@Override
public void execute(List<String> args, Engine engine, BufferedOutputStream output) {
PrintWriter pw = new PrintWriter(output);
pw.printf("xyz,%s", String.join(",", args));
pw.flush();
}
};

private final CommandRegistry registry = new CommandRegistry() {{
register("xyz", xyzCommand);
}};

@Test
public void isCommand() {
assertTrue(Commands.isCommand(":help"));
assertTrue(Commands.isCommand(":help arg1 arg2"));
assertTrue(Commands.isCommand(" :help "));
assertFalse(Commands.isCommand("SELECT * FROM :xyz"));
assertFalse(Commands.isCommand(" SELECT * FROM table ;"));
}

@Test
public void executeLine() throws IOException {
Engine engine = mock(Engine.class);
doReturn(registry).when(engine).commandRegistry();

ByteArrayOutputStream output = new ByteArrayOutputStream();
try (BufferedOutputStream bout = new BufferedOutputStream(output)) {
Commands.executeLine(engine, bout, ":xyz foo bar");
}
assertEquals("xyz,foo,bar", new String(output.toByteArray()));
}
}
8 changes: 8 additions & 0 deletions src/test/java/kmql/DatabaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ public void dropAbsentTable() throws Exception {
db.dropTable("xyz");
}

@Test
public void dropAllTables() throws Exception {
db.prepareTable("xyz", adminClient);
db.dropAllTables();
assertFalse(SqlUtils.tableExists(connection, "xyz"));
assertFalse(SqlUtils.tableExists(connection, "foo"));
}

@Test
public void executeQuery() throws Exception {
db.prepareTable("xyz", adminClient);
Expand Down
Loading

0 comments on commit 14b5328

Please sign in to comment.