Skip to main content

flink_source


title: flink 源码分析 date: 2023-08-04 22:04:41 tags:

  • java
  • flink

源码分析

堆栈

createStreamOperator:39, CodeGenOperatorFactory (org.apache.flink.table.runtime.operators)
createOperator:81, StreamOperatorFactoryUtil (org.apache.flink.streaming.api.operators)
createOperator:858, OperatorChain (org.apache.flink.streaming.runtime.tasks)
createOperatorChain:826, OperatorChain (org.apache.flink.streaming.runtime.tasks)
createOutputCollector:722, OperatorChain (org.apache.flink.streaming.runtime.tasks)
<init>:202, OperatorChain (org.apache.flink.streaming.runtime.tasks)
<init>:60, RegularOperatorChain (org.apache.flink.streaming.runtime.tasks)
restoreInternal:707, StreamTask (org.apache.flink.streaming.runtime.tasks)
restore:693, StreamTask (org.apache.flink.streaming.runtime.tasks)
run:-1, Task$$Lambda/0x00000251b8bab3b8 (org.apache.flink.runtime.taskmanager)
runWithSystemExitMonitoring:953, Task (org.apache.flink.runtime.taskmanager)
restoreAndInvoke:922, Task (org.apache.flink.runtime.taskmanager)
doRun:746, Task (org.apache.flink.runtime.taskmanager)
run:562, Task (org.apache.flink.runtime.taskmanager)
runWith:1596, Thread (java.lang)
run:1583, Thread (java.lang)

public class StreamExecCalc$8 extends org.apache.flink.table.runtime.operators.TableStreamOperator
implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {

private final Object[] references;
private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$1;

private final org.apache.flink.table.data.binary.BinaryStringData str$3 = org.apache.flink.table.data.binary.BinaryStringData.fromString("fff");

org.apache.flink.table.data.BoxedWrapperRowData out = new org.apache.flink.table.data.BoxedWrapperRowData(2);
private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

public StreamExecCalc$8(
Object[] references,
org.apache.flink.streaming.runtime.tasks.StreamTask task,
org.apache.flink.streaming.api.graph.StreamConfig config,
org.apache.flink.streaming.api.operators.Output output,
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception {
this.references = references;
typeSerializer$1 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
this.setup(task, config, output);
if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
.setProcessingTimeService(processingTimeService);
}
}

@Override
public void open() throws Exception {
super.open();

}

@Override
public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue();

org.apache.flink.table.data.binary.BinaryStringData field$0;
boolean isNull$0;
org.apache.flink.table.data.binary.BinaryStringData field$2;
boolean isNull$4;
org.apache.flink.table.data.binary.BinaryStringData result$5;
org.apache.flink.table.data.binary.BinaryStringData field$6;
boolean isNull$6;
org.apache.flink.table.data.binary.BinaryStringData field$7;



isNull$0 = in1.isNullAt(0);
field$0 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$0) {
field$0 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));
}
field$2 = field$0;
if (!isNull$0) {
field$2 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$1.copy(field$2));
}


isNull$6 = in1.isNullAt(1);
field$6 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$6) {
field$6 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1));
}
field$7 = field$6;
if (!isNull$6) {
field$7 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$1.copy(field$7));
}


out.setRowKind(in1.getRowKind());







result$5 = org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(( isNull$0 ) ? null : (field$2), ( false ) ? null : (((org.apache.flink.table.data.binary.BinaryStringData) str$3)));

isNull$4 = (result$5 == null);

if (isNull$4) {
out.setNullAt(0);
} else {
out.setNonPrimitiveValue(0, result$5);
}



if (isNull$6) {
out.setNullAt(1);
} else {
out.setNonPrimitiveValue(1, field$7);
}


output.collect(outElement.replace(out));


}



@Override
public void finish() throws Exception {

super.finish();
}

@Override
public void close() throws Exception {
super.close();

}


}

相关阅读