前置说明:
(1)Kafka中存储的数据样式为JSONArray格式,并不是常规的JSONObject格式。

{
   "title":"LineNameData",
   "data":[
      {
         "LINE_NAME":"A1",
         "TYPE":"R"
      },
      {
         "LINE_NAME":"A2",
         "TYPE":"SR"
      },
      {
         "LINE_NAME":"A3",
         "TYPE":"SSR"
      },
   ]
}

(2)使用的Java版本为1.8,Flink版本为1.17.0,Kafka版本为3.6.2。

1、pom配置

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.17.0</flink.version>
  </properties>

  <dependencies>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.83</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.7.1</version>
    </dependency>

    <!-- 如果有请求MySQL的需求可以加上 -->
    <dependency>
      <groupId>com.mysql</groupId>
      <artifactId>mysql-connector-j</artifactId>
      <version>8.2.0</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

 
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.3.0</version>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>com.compal.App</mainClass>
                    </manifest>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

2、APP.java

package com.compal;

public class App {
    public static void main(String[] args) throws Exception {
        System.out.println("is running");
        KafkaGetUtil.getData(); // Kafka分装到这个类里面了,所以这边就直接引用了
    }
}

3、KafkaGetUtil.java

package com.compal;

import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

public class KafkaGetUtil {
    private final static String kafkaServer = "yourServer";
    private final static String getTopic = "yourTopic";
    private final static String groupId = "yourConsumer";

    public static void getData() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置Flink作业的并行度
        env.setParallelism(1);

        // kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(kafkaServer)
                .setTopics(getTopic)
                .setGroupId(groupId)
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.earliest())
                .build();

        DataStreamSource<String> fromSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // 这里的formSource拿到的是Topic下所有消息的Value,由于我们设置消息内容时将title设置为了LineNameData,所以此处代码的作用就是过滤出title为LineNameData的数据
        SingleOutputStreamOperator<String> filter = fromSource
                .filter(value -> value.indexOf("LineNameData") != -1);

        // 将获得的数据传递给DataProcess类来处理
        filter.addSink(new DataProcess());

        env.execute();
    }
}

4、ChangeLine.java
创建一个类用于存储最终想要的结果,方便开发过程中的赋值,对比等操作。

5、DataProcess.java
Stream的使用方法以及支持的函数可以查看这一篇文章:Java Stream API中常用的函数及其示例 – YW

package com.compal;

import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

/*
 * 对捕获的数据进行处理,类需要继承RichSinkFunction。
*/

public class DataProcess extends RichSinkFunction<String> {

    // 重写此类可以获取到传递过来的数据,也就是value
    @Override
    public void invoke(String value) throws Exception {
        JSONObject KafkaData = JSONObject.parseObject(value);
        JSONArray Data= (JSONArray) KafkaData.get("Data"); // 获取对应Key的Value


        // 将获取到的数据转化为ChangeLine对象,方便后续的数据处理
        List<ChangeLine> LineDataList = LineData.toJavaList(ChangeLine.class);

        // 这里使用的是java的stream流处理,可以实现基础的聚合,过滤,排序等操作
        List<ChangeLine> NowLineList = LineDataList.stream().filter(obj -> {
            // 你的过滤逻辑
        }).collect(Collectors.toList());

        // 用于存放处理完的数据(好传给Kafka)
        JSONArray LineJsonArray = new JSONArray();
        for (ChangeLine line : NowLineList) {
            // 遍历数据,做具体的数据处理......

            // 转化为JSONObject格式,方便传回Kafka
            JSONObject LineJsonObject = new JSONObject();
            LineJsonObject.put("LINE_NAME", line.getLINE_NAME());
            LineJsonArray.add(LineJsonObject);
        }
        ;

        // 构造最终的JSONObject,并发送到Kafka
        JSONObject finalJsonObject = new JSONObject();
        finalJsonObject.put("LineData", LineJsonArray);
        finalJsonObject.put("title", "ChangeLine");
        String ChangeLineJsonString = finalJsonObject.toString();

        Properties props = new Properties();
        props.put("bootstrap.servers","yourKafkaServer");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());

        // 创建Kafka生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息到Kafka
        ProducerRecord<String, String> record = new ProducerRecord<>("yourTopicName", "ChangeLine",ChangeLineJsonString);
        producer.send(record, (RecordMetadata metadata, Exception e) -> {
            if (e != null) {
                System.out.println("发送消息失败: " + e.getMessage());
            } else {
                System.out.println("消息发送成功: " + metadata.toString());
            }
        });

        // 关闭生产者
        producer.close();

    }

}

官方文档参考:Kafka | Apache Flink

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注