Skip to content

Commit

Permalink
[FLINK-36497][table] Remove deprecated method CatalogTable#toProperties
Browse files Browse the repository at this point in the history
Co-authored-by: shiwei10 <[email protected]>
  • Loading branch information
xuyangzhong and Edward-Gavin committed Jan 13, 2025
1 parent b56d4ee commit 24de5f4
Show file tree
Hide file tree
Showing 21 changed files with 35 additions and 278 deletions.
4 changes: 2 additions & 2 deletions docs/content.zh/docs/dev/table/catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,13 @@ catalog.list_databases()
{{< tab "Java/Scala" >}}
```java
// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
catalog.createTable(new ObjectPath("mydb", "mytable"), CatalogTable.newBuilder()...build(), false);

// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
catalog.alterTable(new ObjectPath("mydb", "mytable"), CatalogTable.newBuilder()...build(), false);

// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
Expand Down
4 changes: 2 additions & 2 deletions docs/content/docs/dev/table/catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,13 @@ catalog.list_databases()
{{< tab "Java/Scala" >}}
```java
// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
catalog.createTable(new ObjectPath("mydb", "mytable"), CatalogTable.newBuilder()...build(), false);

// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
catalog.alterTable(new ObjectPath("mydb", "mytable"), CatalogTable.newBuilder()...build(), false);

// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
Expand Down
8 changes: 6 additions & 2 deletions flink-python/pyflink/table/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,8 +830,12 @@ def create_table(

gateway = get_gateway()
return CatalogBaseTable(
gateway.jvm.org.apache.flink.table.catalog.CatalogTableImpl(
schema._j_table_schema, partition_keys, properties, comment))
gateway.jvm.org.apache.flink.table.catalog.CatalogTable.newBuilder()
.schema(schema._j_table_schema.toSchema())
.comment(comment)
.partitionKeys(partition_keys)
.options(properties)
.build())

@staticmethod
def create_view(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,8 +950,7 @@ private ModifyOperation getModifyOperation(
}
Catalog catalog =
catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName());
ResolvedCatalogTable catalogTable =
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
ResolvedCatalogTable catalogTable = createTableOperation.getCatalogTable();
Optional<DynamicTableSink> stagingDynamicTableSink =
getSupportsStagingDynamicTableSink(createTableOperation, catalog, catalogTable);
if (stagingDynamicTableSink.isPresent()) {
Expand Down Expand Up @@ -985,8 +984,7 @@ private ModifyOperation getModifyOperation(
ObjectIdentifier tableIdentifier = createTableOperation.getTableIdentifier();
Catalog catalog =
catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName());
ResolvedCatalogTable catalogTable =
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
ResolvedCatalogTable catalogTable = createTableOperation.getCatalogTable();
Optional<DynamicTableSink> stagingDynamicTableSink =
getSupportsStagingDynamicTableSink(createTableOperation, catalog, catalogTable);
if (stagingDynamicTableSink.isPresent()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,6 @@ public boolean isBatch() {
return isBatch;
}

@Override
public Map<String, String> toProperties() {
// This effectively makes sure the table cannot be persisted in a catalog.
throw new UnsupportedOperationException(
"ConnectorCatalogTable cannot be converted to properties");
}

@Override
public CatalogTable copy(Map<String, String> options) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
Expand All @@ -52,7 +53,8 @@ public class TableFactoryUtil {
public static <T> TableSource<T> findAndCreateTableSource(TableSourceFactory.Context context) {
try {
return TableFactoryService.find(
TableSourceFactory.class, context.getTable().toProperties())
TableSourceFactory.class,
((ResolvedCatalogTable) context.getTable()).toProperties())
.createTableSource(context);
} catch (Throwable t) {
throw new TableException("findAndCreateTableSource failed.", t);
Expand Down Expand Up @@ -81,7 +83,8 @@ public static <T> TableSource<T> findAndCreateTableSource(
public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context context) {
try {
return TableFactoryService.find(
TableSinkFactory.class, context.getTable().toProperties())
TableSinkFactory.class,
((ResolvedCatalogTable) context.getTable()).toProperties())
.createTableSink(context);
} catch (Throwable t) {
throw new TableException("findAndCreateTableSink failed.", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

Expand All @@ -34,13 +34,13 @@
@Internal
public class CreateTableOperation implements CreateOperation {
private final ObjectIdentifier tableIdentifier;
private final CatalogTable catalogTable;
private final ResolvedCatalogTable catalogTable;
private final boolean ignoreIfExists;
private final boolean isTemporary;

public CreateTableOperation(
ObjectIdentifier tableIdentifier,
CatalogTable catalogTable,
ResolvedCatalogTable catalogTable,
boolean ignoreIfExists,
boolean isTemporary) {
this.tableIdentifier = tableIdentifier;
Expand All @@ -49,7 +49,7 @@ public CreateTableOperation(
this.isTemporary = isTemporary;
}

public CatalogTable getCatalogTable() {
public ResolvedCatalogTable getCatalogTable() {
return catalogTable;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public static CatalogTable deserializeCatalogTable(
.schema(schema)
.comment(comment)
.partitionKeys(partitionKeys)
.options(properties)
.options(options)
.snapshot(snapshot)
.build();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,6 @@ default TableKind getTableKind() {
*/
CatalogTable copy(Map<String, String> options);

/**
* Serializes this instance into a map of string-based properties.
*
* <p>Compared to the pure table options in {@link #getOptions()}, the map includes schema,
* partitioning, and other characteristics in a serialized form.
*
* @deprecated Only a {@link ResolvedCatalogTable} is serializable to properties.
*/
@Deprecated
default Map<String, String> toProperties() {
return Collections.emptyMap();
}

/** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
default Optional<Long> getSnapshot() {
return Optional.empty();
Expand Down
Loading

0 comments on commit 24de5f4

Please sign in to comment.