Josh Bradt created FLINK-12399:
---------------------------------- Summary: FilterableTableSource does not use filters on job run Key: FLINK-12399 URL: https://issues.apache.org/jira/browse/FLINK-12399 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.8.0 Reporter: Josh Bradt Attachments: flink-filter-bug.tar.gz As discussed [on the mailing list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html], there appears to be a bug where a job that uses a custom FilterableTableSource does not keep the filters that were pushed down into the table source. More specifically, the table source does receive filters via applyPredicates, and a new table source with those filters is returned, but the final job graph appears to use the original table source, which does not contain any filters. I attached a minimal example program to this ticket. The custom table source is as follows: {code:java} public class CustomTableSource implements BatchTableSource<Model>, FilterableTableSource<Model> { private static final Logger LOG = LoggerFactory.getLogger(CustomTableSource.class); private final Filter[] filters; private final FilterConverter converter = new FilterConverter(); public CustomTableSource() { this(null); } private CustomTableSource(Filter[] filters) { this.filters = filters; } @Override public DataSet<Model> getDataSet(ExecutionEnvironment execEnv) { if (filters == null) { LOG.info("==== No filters defined ===="); } else { LOG.info("==== Found filters ===="); for (Filter filter : filters) { LOG.info("FILTER: {}", filter); } } return execEnv.fromCollection(allModels()); } @Override public TableSource<Model> applyPredicate(List<Expression> predicates) { LOG.info("Applying predicates"); List<Filter> acceptedFilters = new ArrayList<>(); for (final Expression predicate : predicates) { converter.convert(predicate).ifPresent(acceptedFilters::add); } return new CustomTableSource(acceptedFilters.toArray(new Filter[0])); } @Override public boolean isFilterPushedDown() { return filters != null; } @Override public TypeInformation<Model> getReturnType() { return TypeInformation.of(Model.class); } @Override public TableSchema getTableSchema() { return TableSchema.fromTypeInfo(getReturnType()); } private List<Model> allModels() { List<Model> models = new ArrayList<>(); models.add(new Model(1, 2, 3, 4)); models.add(new Model(10, 11, 12, 13)); models.add(new Model(20, 21, 22, 23)); return models; } } {code} When run, it logs {noformat} 15:24:54,888 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates 15:24:54,901 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates 15:24:54,910 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates 15:24:54,977 INFO com.klaviyo.filterbug.CustomTableSource - ==== No filters defined ===={noformat} which appears to indicate that although filters are getting pushed down, the final job does not use them. -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |