Hi
Sorry if i mistake with mailing list. After BEAM-102 was solved in FlinkStreamingPipelineTranslator we have code in visitPrimitiveTransform: if (translator == null && applyCanTranslate(transform, node, translator)) { LOG.info(node.getTransform().getClass().toString()); throw new UnsupportedOperationException( "The transform " + transform + " is currently not supported."); } applyStreamingTransform(transform, node, translator); but applyCanTranslate and applyStreamingTransform always require NotNull translator as result if you try use side input in your code then you will cause NPE Maybe Aljoscha Krettek could describe how this code must work? Regards, Alexey Diomin |
Hi,
I think this is more suited for the Beam dev list. Nevertheless, I think this is a coding error and the condition should be if (translator != null && !applyCanTranslate(transform, node, translator)) With what program did you encounter an NPE, it seems to me that this should rarely happen, at least it doesn't happen in all the Beam runner tests. Cheers, Aljoscha On Wed, 31 Aug 2016 at 11:27 Demin Alexey <[hidden email]> wrote: > Hi > > Sorry if i mistake with mailing list. > > After BEAM-102 was solved in FlinkStreamingPipelineTranslator we have code > in visitPrimitiveTransform: > > > if (translator == null && applyCanTranslate(transform, node, translator)) { > LOG.info(node.getTransform().getClass().toString()); > throw new UnsupportedOperationException( > "The transform " + transform + " is currently not supported."); > } > applyStreamingTransform(transform, node, translator); > > > but applyCanTranslate and applyStreamingTransform always require NotNull > translator > as result if you try use side input in your code then you will cause NPE > > Maybe Aljoscha Krettek could describe how this code must work? > > > Regards, > Alexey Diomin > |
Hi
If we can change code on translator != null then next line ( applyStreamingTransform(transform, node, translator); ) will cause NPE It's main problem why I don't understand code: x = null; if (x == null && f1_null_value_forbid(x)) { ..} f2_null_value_forbid(x); change (x == null) => (x !=null) simple change point of NPE 2016-08-31 13:43 GMT+04:00 Aljoscha Krettek <[hidden email]>: > Hi, > I think this is more suited for the Beam dev list. Nevertheless, I think > this is a coding error and the condition should be > if (translator != null && !applyCanTranslate(transform, node, translator)) > > With what program did you encounter an NPE, it seems to me that this should > rarely happen, at least it doesn't happen in all the Beam runner tests. > > Cheers, > Aljoscha > > On Wed, 31 Aug 2016 at 11:27 Demin Alexey <[hidden email]> wrote: > > > Hi > > > > Sorry if i mistake with mailing list. > > > > After BEAM-102 was solved in FlinkStreamingPipelineTranslator we have > code > > in visitPrimitiveTransform: > > > > > > if (translator == null && applyCanTranslate(transform, node, > translator)) { > > LOG.info(node.getTransform().getClass().toString()); > > throw new UnsupportedOperationException( > > "The transform " + transform + " is currently not supported."); > > } > > applyStreamingTransform(transform, node, translator); > > > > > > but applyCanTranslate and applyStreamingTransform always require NotNull > > translator > > as result if you try use side input in your code then you will cause NPE > > > > Maybe Aljoscha Krettek could describe how this code must work? > > > > > > Regards, > > Alexey Diomin > > > |
Program for reproduce
https://gist.github.com/xhumanoid/d784a4463a45e68acb124709a521156e 1) options.setStreaming(false); - we have NPE and i can't understand how code work 2) options.setStreaming(true); - pipeline can compile (he still have error, but it's my incorrect work with window) 2016-08-31 13:53 GMT+04:00 Demin Alexey <[hidden email]>: > Hi > > If we can change code on translator != null then next line ( > applyStreamingTransform(transform, node, translator); ) will cause NPE > > It's main problem why I don't understand code: > > x = null; > if (x == null && f1_null_value_forbid(x)) { ..} > f2_null_value_forbid(x); > > change (x == null) => (x !=null) simple change point of NPE > > > 2016-08-31 13:43 GMT+04:00 Aljoscha Krettek <[hidden email]>: > >> Hi, >> I think this is more suited for the Beam dev list. Nevertheless, I think >> this is a coding error and the condition should be >> if (translator != null && !applyCanTranslate(transform, node, translator)) >> >> With what program did you encounter an NPE, it seems to me that this >> should >> rarely happen, at least it doesn't happen in all the Beam runner tests. >> >> Cheers, >> Aljoscha >> >> On Wed, 31 Aug 2016 at 11:27 Demin Alexey <[hidden email]> wrote: >> >> > Hi >> > >> > Sorry if i mistake with mailing list. >> > >> > After BEAM-102 was solved in FlinkStreamingPipelineTranslator we have >> code >> > in visitPrimitiveTransform: >> > >> > >> > if (translator == null && applyCanTranslate(transform, node, >> translator)) { >> > LOG.info(node.getTransform().getClass().toString()); >> > throw new UnsupportedOperationException( >> > "The transform " + transform + " is currently not supported."); >> > } >> > applyStreamingTransform(transform, node, translator); >> > >> > >> > but applyCanTranslate and applyStreamingTransform always require NotNull >> > translator >> > as result if you try use side input in your code then you will cause NPE >> > >> > Maybe Aljoscha Krettek could describe how this code must work? >> > >> > >> > Regards, >> > Alexey Diomin >> > >> > > |
Ah I see, an unbounded source, such as the Kafka source does not work in
batch mode (which streamStreaming(false) enables). The code should work in streaming mode if you apply some window that is compatible with the side-input window to the main input. I think the code in streaming still works because there cannot be cases where the translator is null right now. The correct check should be this, though: if (translator == null || !applyCanTranslate(transform, node, translator)) Cheers, Aljoscha On Wed, 31 Aug 2016 at 12:07 Demin Alexey <[hidden email]> wrote: > Program for reproduce > > https://gist.github.com/xhumanoid/d784a4463a45e68acb124709a521156e > > 1) options.setStreaming(false); - we have NPE and i can't understand how > code work > 2) options.setStreaming(true); - pipeline can compile (he still have > error, but it's my incorrect work with window) > > > 2016-08-31 13:53 GMT+04:00 Demin Alexey <[hidden email]>: > > > Hi > > > > If we can change code on translator != null then next line ( > > applyStreamingTransform(transform, node, translator); ) will cause NPE > > > > It's main problem why I don't understand code: > > > > x = null; > > if (x == null && f1_null_value_forbid(x)) { ..} > > f2_null_value_forbid(x); > > > > change (x == null) => (x !=null) simple change point of NPE > > > > > > 2016-08-31 13:43 GMT+04:00 Aljoscha Krettek <[hidden email]>: > > > >> Hi, > >> I think this is more suited for the Beam dev list. Nevertheless, I think > >> this is a coding error and the condition should be > >> if (translator != null && !applyCanTranslate(transform, node, > translator)) > >> > >> With what program did you encounter an NPE, it seems to me that this > >> should > >> rarely happen, at least it doesn't happen in all the Beam runner tests. > >> > >> Cheers, > >> Aljoscha > >> > >> On Wed, 31 Aug 2016 at 11:27 Demin Alexey <[hidden email]> wrote: > >> > >> > Hi > >> > > >> > Sorry if i mistake with mailing list. > >> > > >> > After BEAM-102 was solved in FlinkStreamingPipelineTranslator we have > >> code > >> > in visitPrimitiveTransform: > >> > > >> > > >> > if (translator == null && applyCanTranslate(transform, node, > >> translator)) { > >> > LOG.info(node.getTransform().getClass().toString()); > >> > throw new UnsupportedOperationException( > >> > "The transform " + transform + " is currently not supported."); > >> > } > >> > applyStreamingTransform(transform, node, translator); > >> > > >> > > >> > but applyCanTranslate and applyStreamingTransform always require > NotNull > >> > translator > >> > as result if you try use side input in your code then you will cause > NPE > >> > > >> > Maybe Aljoscha Krettek could describe how this code must work? > >> > > >> > > >> > Regards, > >> > Alexey Diomin > >> > > >> > > > > > |
Thanks
with if (translator == null || !applyCanTranslate(transform, node, translator)) all working as expectected Regards, Alexey Diomin 2016-08-31 14:12 GMT+04:00 Aljoscha Krettek <[hidden email]>: > Ah I see, an unbounded source, such as the Kafka source does not work in > batch mode (which streamStreaming(false) enables). The code should work in > streaming mode if you apply some window that is compatible with the > side-input window to the main input. > > I think the code in streaming still works because there cannot be cases > where the translator is null right now. The correct check should be this, > though: > if (translator == null || !applyCanTranslate(transform, node, translator)) > > Cheers, > Aljoscha > > On Wed, 31 Aug 2016 at 12:07 Demin Alexey <[hidden email]> wrote: > > > Program for reproduce > > > > https://gist.github.com/xhumanoid/d784a4463a45e68acb124709a521156e > > > > 1) options.setStreaming(false); - we have NPE and i can't understand how > > code work > > 2) options.setStreaming(true); - pipeline can compile (he still have > > error, but it's my incorrect work with window) > > > > > > 2016-08-31 13:53 GMT+04:00 Demin Alexey <[hidden email]>: > > > > > Hi > > > > > > If we can change code on translator != null then next line ( > > > applyStreamingTransform(transform, node, translator); ) will cause NPE > > > > > > It's main problem why I don't understand code: > > > > > > x = null; > > > if (x == null && f1_null_value_forbid(x)) { ..} > > > f2_null_value_forbid(x); > > > > > > change (x == null) => (x !=null) simple change point of NPE > > > > > > > > > 2016-08-31 13:43 GMT+04:00 Aljoscha Krettek <[hidden email]>: > > > > > >> Hi, > > >> I think this is more suited for the Beam dev list. Nevertheless, I > think > > >> this is a coding error and the condition should be > > >> if (translator != null && !applyCanTranslate(transform, node, > > translator)) > > >> > > >> With what program did you encounter an NPE, it seems to me that this > > >> should > > >> rarely happen, at least it doesn't happen in all the Beam runner > tests. > > >> > > >> Cheers, > > >> Aljoscha > > >> > > >> On Wed, 31 Aug 2016 at 11:27 Demin Alexey <[hidden email]> wrote: > > >> > > >> > Hi > > >> > > > >> > Sorry if i mistake with mailing list. > > >> > > > >> > After BEAM-102 was solved in FlinkStreamingPipelineTranslator we > have > > >> code > > >> > in visitPrimitiveTransform: > > >> > > > >> > > > >> > if (translator == null && applyCanTranslate(transform, node, > > >> translator)) { > > >> > LOG.info(node.getTransform().getClass().toString()); > > >> > throw new UnsupportedOperationException( > > >> > "The transform " + transform + " is currently not > supported."); > > >> > } > > >> > applyStreamingTransform(transform, node, translator); > > >> > > > >> > > > >> > but applyCanTranslate and applyStreamingTransform always require > > NotNull > > >> > translator > > >> > as result if you try use side input in your code then you will cause > > NPE > > >> > > > >> > Maybe Aljoscha Krettek could describe how this code must work? > > >> > > > >> > > > >> > Regards, > > >> > Alexey Diomin > > >> > > > >> > > > > > > > > > |
Hi Alexey,
You don't have to set the streaming mode. The Flink Runner will automatically choose to use streaming mode when it discovers UnboundedSources like Kafka. I'm wondering why that didn't work in your case. I just ran your example and it chose streaming mode and didn't return an error during pipeline translation. So I'm curious, which version of Beam are you working with? Best, Max On Wed, Aug 31, 2016 at 12:34 PM, Demin Alexey <[hidden email]> wrote: > Thanks > > with if (translator == null || !applyCanTranslate(transform, node, > translator)) all working as expectected > > > Regards, > Alexey Diomin > > > 2016-08-31 14:12 GMT+04:00 Aljoscha Krettek <[hidden email]>: > >> Ah I see, an unbounded source, such as the Kafka source does not work in >> batch mode (which streamStreaming(false) enables). The code should work in >> streaming mode if you apply some window that is compatible with the >> side-input window to the main input. >> >> I think the code in streaming still works because there cannot be cases >> where the translator is null right now. The correct check should be this, >> though: >> if (translator == null || !applyCanTranslate(transform, node, translator)) >> >> Cheers, >> Aljoscha >> >> On Wed, 31 Aug 2016 at 12:07 Demin Alexey <[hidden email]> wrote: >> >> > Program for reproduce >> > >> > https://gist.github.com/xhumanoid/d784a4463a45e68acb124709a521156e >> > >> > 1) options.setStreaming(false); - we have NPE and i can't understand how >> > code work >> > 2) options.setStreaming(true); - pipeline can compile (he still have >> > error, but it's my incorrect work with window) >> > >> > >> > 2016-08-31 13:53 GMT+04:00 Demin Alexey <[hidden email]>: >> > >> > > Hi >> > > >> > > If we can change code on translator != null then next line ( >> > > applyStreamingTransform(transform, node, translator); ) will cause NPE >> > > >> > > It's main problem why I don't understand code: >> > > >> > > x = null; >> > > if (x == null && f1_null_value_forbid(x)) { ..} >> > > f2_null_value_forbid(x); >> > > >> > > change (x == null) => (x !=null) simple change point of NPE >> > > >> > > >> > > 2016-08-31 13:43 GMT+04:00 Aljoscha Krettek <[hidden email]>: >> > > >> > >> Hi, >> > >> I think this is more suited for the Beam dev list. Nevertheless, I >> think >> > >> this is a coding error and the condition should be >> > >> if (translator != null && !applyCanTranslate(transform, node, >> > translator)) >> > >> >> > >> With what program did you encounter an NPE, it seems to me that this >> > >> should >> > >> rarely happen, at least it doesn't happen in all the Beam runner >> tests. >> > >> >> > >> Cheers, >> > >> Aljoscha >> > >> >> > >> On Wed, 31 Aug 2016 at 11:27 Demin Alexey <[hidden email]> wrote: >> > >> >> > >> > Hi >> > >> > >> > >> > Sorry if i mistake with mailing list. >> > >> > >> > >> > After BEAM-102 was solved in FlinkStreamingPipelineTranslator we >> have >> > >> code >> > >> > in visitPrimitiveTransform: >> > >> > >> > >> > >> > >> > if (translator == null && applyCanTranslate(transform, node, >> > >> translator)) { >> > >> > LOG.info(node.getTransform().getClass().toString()); >> > >> > throw new UnsupportedOperationException( >> > >> > "The transform " + transform + " is currently not >> supported."); >> > >> > } >> > >> > applyStreamingTransform(transform, node, translator); >> > >> > >> > >> > >> > >> > but applyCanTranslate and applyStreamingTransform always require >> > NotNull >> > >> > translator >> > >> > as result if you try use side input in your code then you will cause >> > NPE >> > >> > >> > >> > Maybe Aljoscha Krettek could describe how this code must work? >> > >> > >> > >> > >> > >> > Regards, >> > >> > Alexey Diomin >> > >> > >> > >> >> > > >> > > >> > >> |
Free forum by Nabble | Edit this page |