[jira] [Created] (FLINK-16571) Throw exception when current key are out of range

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-16571) Throw exception when current key are out of range

Shang Yuanchun (Jira)
&res created FLINK-16571:
----------------------------

             Summary: Throw exception when current key are out of range
                 Key: FLINK-16571
                 URL: https://issues.apache.org/jira/browse/FLINK-16571
             Project: Flink
          Issue Type: Improvement
          Components: API / Core
    Affects Versions: 1.9.2
            Reporter: &res


* I've got a stream of records, that are "keyed by" using a class whose hashCode isn't stable across jvm instances.
* The records are then processed by a parallel operator, which is running on several task managers on the cluster.
* A given task manager receive a record, calculates the hashCode of the key and sends it to another task manager instance according to it's slot allocation
* The other task manager instance receives the record for a given parallel slot (allocated by the other instance), calculate the hash for the record (which doesn't match the original hash) and then I get this error:
{code}
java.lang.NullPointerException
        at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
        at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
        at org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:90)
{code}

It took me a while to figure out what's going on, and while the issue is on my side, it would have helped if the InternalKeyContextImpl had complained in the first place that the hashCode was out of range.
InternalKeyContextImpl#setCurrentKeyGroupIndex should complain when currentKeyGroupIndex isn't in the keyGroupRange, suggesting that the hash code function isn't stable.


A few notes:
* The reason I'm getting the issue is because on oracle jdk, Enum hashcode use the memory address of the enum. I should use ordinal when hashing to ensure stability
* The issue is more likely to happen is cluster.evenly-spread-out-slots is on (which forces the job to be distributed to different instances)

Here's an example that replicates the issue. It has to run using oracle jvm on a cluster of a few nodes, using luster.evenly-spread-out-slots:

{code:java}

import java.lang.reflect.Field;
import java.util.Random;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobThatFails {

  private static final Logger LOG = LoggerFactory.getLogger(JobThatFails.class);

  private enum EnumKey {
    ONE,
    TWO,
    THREE,
    FOUR,
    FIVE,
    SIX,
    SEVEN,
    EIGHT,
    NINE,
    TEN
  }

  void defineGraph(StreamExecutionEnvironment env) {

    int parallelism = 4;
    DataStream<Tuple2<EnumKey, Integer>> src = null;
    for (int i = 0; i < 10; ++i) {
      DataStream<Tuple2<EnumKey, Integer>> eachSrc = env.addSource(new IntegerSourceFunction());
      if (src == null) {
        src = eachSrc;
      } else {
        src = src.union(eachSrc);
      }
    }

    src.keyBy(p -> p.f0)
        .map(new StatefulCounter())
        .name(StatefulCounter.class.getSimpleName())
        .setParallelism(parallelism)
        .addSink(
            new SinkFunction<Integer>() {
              @Override
              public void invoke(Integer value, Context context) {}
            });
  }

  private static class IntegerSourceFunction implements SourceFunction<Tuple2<EnumKey, Integer>> {

    volatile boolean running;

    private IntegerSourceFunction() {}

    @Override
    public void run(SourceContext<Tuple2<EnumKey, Integer>> ctx) throws Exception {
      running = true;
      Random random = new Random();
      while (running) {
        for (EnumKey e : EnumKey.values()) {
          ctx.collect(Tuple2.of(e, random.nextInt(10)));
        }
        Thread.sleep(1000);
      }
    }

    @Override
    public void cancel() {
      running = false;
    }
  }

  class StatefulCounter extends RichMapFunction<Tuple2<EnumKey, Integer>, Integer> {

    private transient ValueState<Integer> stateHandle;

    @Override
    public Integer map(Tuple2<EnumKey, Integer> value) throws Exception {
      LOG.info(getDebugMessage());
      Integer state = stateHandle.value();
      if (state == null) {
        state = 0;
      }
      state = state + value.f1;
      stateHandle.update(state);
      return state;
    }

    @Override
    public void open(Configuration configuration) {
      stateHandle =
          getRuntimeContext()
              .getState(
                  new ValueStateDescriptor<>(
                      JobThatFails.class.getSimpleName(), TypeInformation.of(Integer.class)));
    }

    private String getDebugMessage() {
      Object stateTable = JobThatFails.readField(stateHandle, "stateTable");
      Object keyContext = JobThatFails.readField(stateTable, "keyContext");
      Object currentNamespace = JobThatFails.readField(stateHandle, "currentNamespace");
      Object length = ((Object[]) JobThatFails.readField(stateTable, "keyGroupedStateMaps")).length;
      Object keyGroupOffset = JobThatFails.readField(stateTable, "keyGroupOffset");
      JobThatFails.readField(keyContext, "currentKeyGroupIndex");
      KeyGroupRange keyGroupRange =
          (KeyGroupRange) JobThatFails.readField(keyContext, "keyGroupRange");

      return String.format(
          "numberOfKeyGroups=%s currentKey=%s currentKeyGroupIndex=%s keyGroupOffset=%s currentNamespace=%s length=%s startKeyGroup=%s endKeyGroup=%s",
          JobThatFails.readField(keyContext, "numberOfKeyGroups"),
          JobThatFails.readField(keyContext, "currentKey"),
          JobThatFails.readField(keyContext, "currentKeyGroupIndex"),
          keyGroupOffset,
          currentNamespace,
          length,
          keyGroupRange.getStartKeyGroup(),
          keyGroupRange.getEndKeyGroup());
    }
  }

  static Object readField(Object object, String name) {
    try {
      Field field = getField(name, object.getClass());
      field.setAccessible(true);
      return field.get(object);
    } catch (IllegalAccessException | NoSuchFieldException | SecurityException e) {
      throw new RuntimeException(
          String.format(
              "Cannot read %s from %s %s", name, object.getClass().getSimpleName(), object),
          e);
    }
  }

  static Field getField(String name, Class<?> clazz) throws NoSuchFieldException {
    try {
      return clazz.getDeclaredField(name);
    } catch (NoSuchFieldException e) {
      if (clazz.getSuperclass() == null) {
        throw e;
      } else {
        return getField(name, clazz.getSuperclass());
      }
    }
  }

  public static void main(String[] args) throws Exception {
    JobThatFails job = new JobThatFails();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    job.defineGraph(env);
    JobExecutionResult result = env.execute(JobThatFails.class.getSimpleName());
  }
}

{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)