Till Rohrmann created FLINK-1195:
------------------------------------ Summary: Improvement of benchmarking infrastructure Key: FLINK-1195 URL: https://issues.apache.org/jira/browse/FLINK-1195 Project: Flink Issue Type: Wish Reporter: Till Rohrmann I noticed while running my ALS benchmarks that we still have some potential to improve our benchmarking infrastructure. The current state is that we execute the benchmark jobs by writing a script with a single set of parameters. The runtime is then manually retrieved from the web interface of Flink and Spark, respectively. I think we need the following extensions: * Automatic runtime retrieval and storage in a file * Repeated execution of jobs to gather some "advanced" statistics such as mean and standard deviation of the runtimes * Support for value sets for the individual parameters The automatic runtime retrieval would allow us to execute several benchmarks consecutively without having to lookup the runtimes in the logs or in the web interface, which btw only stores the runtimes of the last 5 jobs. What I mean with value sets is that would be nice to specify a set of parameter values for which the benchmark is run without having to write for every single parameter combination a benchmark script. I believe that this feature would become very handy when we want to look at the runtime behaviour of Flink for different input sizes or degrees of parallelism, for example. To illustrate what I mean: {code} INPUTSIZE = 1000, 2000, 4000, 8000 DOP = 1, 2, 4, 8 OUTPUT=benchmarkResults repetitions=10 command=benchmark.jar -p $DOP $INPUTSIZE {code} Something like that would execute the benchmark job with (DOP=1, INPUTSIZE=1000), (DOP=2, INPUTSIZE=2000),.... 10 times each, calculate for each parameter combination runtime statistics and store the results in the file benchmarkResults. I believe that spending some effort now will pay off in the long run because we will benchmark Flink continuously. What do you guys think? -- This message was sent by Atlassian JIRA (v6.3.4#6332) |
Hi!
I totally agree. I can contribute a bash script that makes various runs with different jobs, different parameters and jar files: It automatically collects times and logs for the runs. Output: Execution times (msecs): KMeansPlainJava 64628 56234 62974 66003 66295 KMeansPlainScala 59961 53519 53922 54927 57295 KMeansSimNoKeySels 212684 239473 258493 205840 236463 KMeansImmutable_no_compact 206341 210160 233862 231071 225073 KMeansImmutable_compact 182459 189495 185829 196167 184058 KMeansImmutable_compact_assert 102597 96203 107883 96752 105110 KMeansMutable 95092 91662 103233 88992 93104 Script: #!/bin/bash STRATOSPHERE_HOME="/share/nephele/stratosphere-scala"; JOBS_DIR="$STRATOSPHERE_HOME/examples/pact4s/perfTests"; RESULT_DIR="/home/sewen/scalaExperiments"; TIMES_FILE="times.txt" JOB_NAMES=("WordCountPlainJava" "WordCountPlainScala" "WordCountSimNoKeySels" "WordCountImmutable_no_compact" "WordCountImmutable_compact" "WordCountImmutable_compact_assert" "WordCountMutable" "TPCHQuery3PlainJava" "TPCHQuery3PlainScala" "TPCHQuery3SimNoKeySels" "TPCHQuery3Immutable_no_compact" "TPCHQuery3Immutable_compact" "TPCHQuery3Immutable_compact_assert" "TPCHQuery3Mutable" "KMeansPlainJava" "KMeansPlainScala" "KMeansSimNoKeySels" "KMeansImmutable_no_compact" "KMeansImmutable_compact" "KMeansImmutable_compact_assert" "KMeansMutable"); JOB_JARS=("pact4s-tests-0.2-WordCountPlainJava.jar" "pact4s-tests-0.2-WordCountPlainScala.jar" "pact4s-tests-0.2-WordCountSimNoKeySels.jar" "pact4s-tests-0.2-WordCountImmutable.jar" "pact4s-tests-0.2-WordCountImmutable.jar" "pact4s-tests-0.2-WordCountImmutable.jar" "pact4s-tests-0.2-WordCountMutable.jar" "pact4s-tests-0.2-TPCHQuery3PlainJava.jar" "pact4s-tests-0.2-TPCHQuery3PlainScala.jar" "pact4s-tests-0.2-TPCHQuery3SimNoKeySels.jar" "pact4s-tests-0.2-TPCHQuery3Immutable.jar" "pact4s-tests-0.2-TPCHQuery3Immutable.jar" "pact4s-tests-0.2-TPCHQuery3Immutable.jar" "pact4s-tests-0.2-TPCHQuery3Mutable.jar" "pact4s-tests-0.2-KMeansPlainJava.jar" "pact4s-tests-0.2-KMeansPlainScala.jar" "pact4s-tests-0.2-KMeansSimNoKeySels.jar" "pact4s-tests-0.2-KMeansImmutable.jar" "pact4s-tests-0.2-KMeansImmutable.jar" "pact4s-tests-0.2-KMeansImmutable.jar" "pact4s-tests-0.2-KMeansMutable.jar"); JOB_PARAMETERS=("32 hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum hdfs://cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "32 hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/lipsum hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "-subtasks 32 -input hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum -nocompact -nohints" "-subtasks 32 -input hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs://cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum -nocompact -nohints" "-subtasks 32 -input hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum -nohints" "-subtasks 32 -input hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "-subtasks 32 -input hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "32 hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "32 hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "-subtasks 32 -orders hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output hdfs://cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH -nocompact -nohints" "-subtasks 32 -orders hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH -nocompact -nohints" "-subtasks 32 -orders hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH -nohints" "-subtasks 32 -orders hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "-subtasks 32 -orders hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output hdfs://cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "32 hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_x_1" "32 hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_x_1" "-subtasks 32 -numIterations 1 -dataPoints hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_x_1 -nocompact -nohints" "-subtasks 32 -numIterations 1 -dataPoints hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_x_1 -nocompact -nohints" "-subtasks 32 -numIterations 1 -dataPoints hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_x_1 -nohints" "-subtasks 32 -numIterations 1 -dataPoints hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_x_1" "-subtasks 32 -numIterations 1 -dataPoints hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs:// cloud-7.dima.tu-berlin.de:40010/demo/result_x_1"); NUM_RUNS=5; echo "Beginning Tests..." echo "Execution times (msecs): " > $RESULT_DIR/$TIMES_FILE for index in ${!JOB_NAMES[*]} do job=${JOB_NAMES[$index]}; jar=$JOBS_DIR/${JOB_JARS[$index]}; params=${JOB_PARAMETERS[$index]}; echo "Running $jar with arguments $params as $run_name" echo -n "$job " >> $RESULT_DIR/$TIMES_FILE for test_run in `seq 1 $NUM_RUNS`; do res_dir="$RESULT_DIR/"$job"_$run_name/$test_run"; mkdir -p "$res_dir" echo "Run #$test_run" echo "Restarting Nephele Cluster..." $STRATOSPHERE_HOME/bin/stop-cluster.sh > /dev/null 2> /dev/null sleep 10 rm -rf "$STRATOSPHERE_HOME/log/*" $STRATOSPHERE_HOME/bin/start-cluster.sh > /dev/null 2> /dev/null sleep 60 echo "Running Job..." $STRATOSPHERE_HOME/bin/pact-client.sh run -j $jar -w -a $params > $res_dir/out.txt 2> $res_dir/err.txt time_taken=`cat $res_dir/out.txt | grep 'Job duration (in ms):' | awk '{print $5}'`; echo -n " $time_taken" >> $RESULT_DIR/$TIMES_FILE echo "Copying files..." cp $STRATOSPHERE_HOME/log/* $res_dir/; done echo '' >> $RESULT_DIR/$TIMES_FILE done On Tue, Oct 28, 2014 at 9:41 AM, Till Rohrmann (JIRA) <[hidden email]> wrote: > Till Rohrmann created FLINK-1195: > ------------------------------------ > > Summary: Improvement of benchmarking infrastructure > Key: FLINK-1195 > URL: https://issues.apache.org/jira/browse/FLINK-1195 > Project: Flink > Issue Type: Wish > Reporter: Till Rohrmann > > > I noticed while running my ALS benchmarks that we still have some > potential to improve our benchmarking infrastructure. The current state is > that we execute the benchmark jobs by writing a script with a single set of > parameters. The runtime is then manually retrieved from the web interface > of Flink and Spark, respectively. > > I think we need the following extensions: > > * Automatic runtime retrieval and storage in a file > * Repeated execution of jobs to gather some "advanced" statistics such as > mean and standard deviation of the runtimes > * Support for value sets for the individual parameters > > The automatic runtime retrieval would allow us to execute several > benchmarks consecutively without having to lookup the runtimes in the logs > or in the web interface, which btw only stores the runtimes of the last 5 > jobs. > > What I mean with value sets is that would be nice to specify a set of > parameter values for which the benchmark is run without having to write for > every single parameter combination a benchmark script. I believe that this > feature would become very handy when we want to look at the runtime > behaviour of Flink for different input sizes or degrees of parallelism, for > example. To illustrate what I mean: > > {code} > INPUTSIZE = 1000, 2000, 4000, 8000 > DOP = 1, 2, 4, 8 > OUTPUT=benchmarkResults > repetitions=10 > command=benchmark.jar -p $DOP $INPUTSIZE > {code} > > Something like that would execute the benchmark job with (DOP=1, > INPUTSIZE=1000), (DOP=2, INPUTSIZE=2000),.... 10 times each, calculate for > each parameter combination runtime statistics and store the results in the > file benchmarkResults. > > I believe that spending some effort now will pay off in the long run > because we will benchmark Flink continuously. What do you guys think? > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) > |
Free forum by Nabble | Edit this page |