闲话少续,直接上代码,参考官方和咨询钉钉实现

1. 导入maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>flinktest</groupId>
    <artifactId>com.flink</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>7</source>
                    <target>7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.0</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.10.0</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-kafka_2.11</artifactId>-->
<!--            <version>1.10.0</version>-->
<!--        </dependency>-->
        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-table-planner-blink_2.11</artifactId>-->
<!--            <version>1.10.0</version>-->
<!--&lt;!&ndash;            <scope>provided</scope>&ndash;&gt;-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-table-common</artifactId>-->
<!--            <version>1.10.0</version>-->
<!--&lt;!&ndash;            <scope>provided</scope>&ndash;&gt;-->
<!--        </dependency>-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.10.0</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.10.0</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>com.alibaba.blink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>blink-3.2.2</version>
        </dependency>
    </dependencies>
</project>

   
   

2. 编写自定义函数、此处参考package org.apache.flink.table.runtime.functions.utils中的JsonUtils工具类

package TestKudu;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class GetJsonObject extends ScalarFunction {
    // 可选, open方法可以不写,若写的话需要import org.apache.flink.table.functions.FunctionContext;
    public long eval(String a) {
        return a == null ? 0 : a.length();
    }
//    public long eval(String b, String c) {
//        return eval(b) + eval(c);
//    }
    //可选,close方法可以不写
    @Override
    public void close() {
    }
    public static final Logger LOG = LoggerFactory.getLogger(GetJsonObject.class);
    public final Pattern patternKey = Pattern.compile("^([a-zA-Z0-9_\\-\\:\\s]+).*");
    public final Pattern patternIndex = Pattern.compile("\\[([0-9]+|\\*)\\]");
    public static final JsonFactory JSON_FACTORY = new JsonFactory();
    static {
        // Allows for unescaped ASCII control characters in JSON values
        JSON_FACTORY.enable(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS);
    }
    public static final ObjectMapper MAPPER = new ObjectMapper(JSON_FACTORY);
    public static final JavaType MAP_TYPE = TypeFactory.defaultInstance().constructMapType(Map.class, Object.class, Object.class);
    public static final JavaType LIST_TYPE = TypeFactory.defaultInstance().constructRawCollectionType(List.class);
    /**
     *An LRU cache using a linked hash map.
     */
    public static class HashCache<K, V> extends LinkedHashMap<K, V> {
        private static final int CACHE_SIZE = 16;
        private static final int INIT_SIZE = 32;
        private static final float LOAD_FACTOR = 0.6f;
        HashCache() {
            super(INIT_SIZE, LOAD_FACTOR);
        }
        private static final long serialVersionUID = 1;
        @Override
        protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
            return size() > CACHE_SIZE;
        }
    }
    /**
     *An ThreadLocal cache using a linked hash map.
     */
    public static class ThreadLocalHashCache<K, V> {
        private ThreadLocal<HashCache<K, V>> cache = new ThreadLocal<>();
        public V get(K key) {
            HashCache<K, V> m = cache.get();
            if (m == null) {
                m = new HashCache<>();
                cache.set(m);
            }
            return m.get(key);
        }
        public V put(K key, V value) {
            HashCache<K, V> m = cache.get();
            if (m == null) {
                m = new HashCache<>();
                cache.set(m);
            }
            return m.put(key, value);
        }
        public void remove() {
            cache.remove();
        }
    }
    public static ThreadLocalHashCache<String, Object> extractObjectCache = new ThreadLocalHashCache<String, Object>();
    public static ThreadLocalHashCache<String, String[]> pathExprCache = new ThreadLocalHashCache<String, String[]>();
    public static ThreadLocalHashCache<String, ArrayList<String>> indexListCache =
            new ThreadLocalHashCache<String, ArrayList<String>>();
    public static ThreadLocalHashCache<String, String> mKeyGroup1Cache = new ThreadLocalHashCache<String, String>();
    public static ThreadLocalHashCache<String, Boolean> mKeyMatchesCache = new ThreadLocalHashCache<String, Boolean>();
    private static ThreadLocal<GetJsonObject> instance = new ThreadLocal<GetJsonObject>();
    public static void remove(){
        instance.remove();
    }
    public static GetJsonObject getInstance() {
        if (null == instance.get()) {
            instance.set(new GetJsonObject());
        }
        return instance.get();
    }
    public String eval(String jsonString, String pathString) {
        if (jsonString == null || jsonString.isEmpty() || pathString == null
                || pathString.isEmpty() || pathString.charAt(0) != '$') {
            LOG.debug("jsonString is null or empty, or path is null or empty, or path is not start with '$'! " +
                    "jsonString: " + jsonString + ", path: " + pathString);
            return null;
        }
        String result = new String();
        int pathExprStart = 1;
        boolean isRootArray = false;
        if (pathString.length() > 1) {
            if (pathString.charAt(1) == '[') {
                pathExprStart = 0;
                isRootArray = true;
            } else if (pathString.charAt(1) == '.') {
                isRootArray = pathString.length() > 2 && pathString.charAt(2) == '[';
            } else {
                LOG.debug("path String illegal! path String: " + pathString);
                return null;
            }
        }
        // Cache pathExpr
        String[] pathExpr = pathExprCache.get(pathString);
        if (pathExpr == null) {
            pathExpr = pathString.split("\\.", -1);
            pathExprCache.put(pathString, pathExpr);
        }
        // Cache extractObject
        Object extractObject = extractObjectCache.get(jsonString);
        if (extractObject == null) {
            JavaType javaType = isRootArray ? LIST_TYPE : MAP_TYPE;
            try {
                extractObject = MAPPER.readValue(jsonString, javaType);
            } catch (Exception e) {
                LOG.debug(
                        "Exception when read json value with type :" + javaType.toString() +
                                ", and json string: " + jsonString, e);
                return null;
            }
            extractObjectCache.put(jsonString, extractObject);
        }
        for (int i = pathExprStart; i < pathExpr.length; i++) {
            if (extractObject == null) {
                LOG.debug("path look up fail at: " + pathExpr[i - 1 >= 0 ? i - 1 : 0] +
                        ", pathString: " + pathString +
                        "json: " + jsonString);
                return null;
            }
            extractObject = extract(extractObject, pathExpr[i], i == pathExprStart && isRootArray);
        }
        if (extractObject instanceof Map || extractObject instanceof List) {
            try {
                result = MAPPER.writeValueAsString(extractObject);
            } catch (Exception e) {
                LOG.debug(
                        "Exception when MAPPER.writeValueAsString :" +
                                extractObject.toString(), e);
                return null;
            }
        } else if (extractObject != null) {
            result = extractObject.toString();
        } else {
            LOG.debug("path look up fail at: " + (pathExpr.length - 1 >= 0 ? pathExpr[pathExpr.length - 1] : null) +
                    ", pathString: " + pathString +
                    "json: " + jsonString);
            return null;
        }
        return result;
    }
    public String[] getJsonObjectsWithoutDollar(String jsonString, String[] pathStrings) {
        if (jsonString == null || jsonString.isEmpty() || pathStrings == null
                || pathStrings.length == 0) {
            LOG.debug("jsonString is null or empty, or path is null or empty! " +
                    "jsonString: " + jsonString);
            return new String[0];
        }
        int pathExprStart = 1;
        boolean isRootArray = false;
        Object rootExtractObject = extractObjectCache.get(jsonString);
        if (rootExtractObject == null) {
            JavaType javaType = isRootArray ? LIST_TYPE : MAP_TYPE;
            try {
                rootExtractObject = MAPPER.readValue(jsonString, javaType);
            } catch (Exception e) {
                LOG.debug(
                        "Exception when read json value with type :" + javaType.toString() +
                                ", and json string: " + jsonString, e);
                return new String[0];
            }
            extractObjectCache.put(jsonString, rootExtractObject);
        }
        String[] result = new String[pathStrings.length];
        for (int i = 0; i < pathStrings.length; i++) {
            String pathString = "$." + pathStrings[i];
            if (pathString == null || pathString.length() == 0){
                result[i] = null;
                LOG.debug(i + "th path String is null or empty! " +
                        "pathString: " + pathString);
                continue;
            }
            if (pathString.length() > 1) {
                if (pathString.charAt(1) == '[') {
                    pathExprStart = 0;
                    isRootArray = true;
                } else if (pathString.charAt(1) == '.') {
                    isRootArray = pathString.length() > 2 && pathString.charAt(2) == '[';
                } else {
                    result[i] = null;
                    LOG.debug(i + "th path String illegal! path String: " + pathString);
                    continue;
                }
            }
            // Cache pathExpr
            String[] pathExpr = pathExprCache.get(pathString);
            if (pathExpr == null) {
                pathExpr = pathString.split("\\.", -1);
                pathExprCache.put(pathString, pathExpr);
            }
            // Cache extractObject
            Object extractObject = rootExtractObject;
            if (extractObject == null) {
                JavaType javaType = isRootArray ? LIST_TYPE : MAP_TYPE;
                try {
                    extractObject = MAPPER.readValue(jsonString, javaType);
                } catch (Exception e) {
                    LOG.debug(
                            "Exception when read json value with type :" + javaType.toString() +
                                    ", and json string: " + jsonString, e);
                    result[i] = null;
                    continue;
                }
                extractObjectCache.put(jsonString, extractObject);
            }
            for (int j = pathExprStart; j < pathExpr.length; j++) {
                if (extractObject == null) {
                    result[i] = null;
                    LOG.debug(i + "th path look up fail at: " + pathExpr[j - 1 >= 0 ? j - 1 : 0] +
                            ", pathString: " + pathString +
                            "json: " + jsonString);
                    continue;
                }
                extractObject = extract(extractObject, pathExpr[j], j == pathExprStart && isRootArray);
            }
            if (extractObject instanceof Map || extractObject instanceof List) {
                try {
                    result[i] = MAPPER.writeValueAsString(extractObject);
                } catch (Exception e) {
                    LOG.debug(
                            "Exception when MAPPER.writeValueAsString :" +
                                    extractObject.toString(), e);
                    result[i] = null;
                    continue;
                }
            } else if (extractObject != null) {
                result[i] = extractObject.toString();
            } else {
                result[i] = null;
                LOG.debug(i + "th path look up fail at: " +
                        (pathExpr.length - 1 >= 0 ? pathExpr[pathExpr.length - 1] : null) +
                        ", pathString: " + pathString +
                        "json: " + jsonString);
                continue;
            }
        }
        return result;
    }
    protected Object extract(Object json, String path, boolean skipMapProc) {
        // skip MAP processing for the first path element if root is array
        if (!skipMapProc) {
            // Cache patternkey.matcher(path).matches()
            Matcher mKey = null;
            Boolean mKeyMatches = mKeyMatchesCache.get(path);
            if (mKeyMatches == null) {
                mKey = patternKey.matcher(path);
                mKeyMatches = mKey.matches() ? Boolean.TRUE : Boolean.FALSE;
                mKeyMatchesCache.put(path, mKeyMatches);
            }
            if (!mKeyMatches.booleanValue()) {
                return null;
            }
            // Cache mkey.group(1)
            String mKeyGroup1 = mKeyGroup1Cache.get(path);
            if (mKeyGroup1 == null) {
                if (mKey == null) {
                    mKey = patternKey.matcher(path);
                    mKeyMatches = mKey.matches() ? Boolean.TRUE : Boolean.FALSE;
                    mKeyMatchesCache.put(path, mKeyMatches);
                    if (!mKeyMatches.booleanValue()) {
                        return null;
                    }
                }
                mKeyGroup1 = mKey.group(1);
                mKeyGroup1Cache.put(path, mKeyGroup1);
            }
            json = extractJsonWithkey(json, mKeyGroup1);
        }
        // Cache indexList
        ArrayList<String> indexList = indexListCache.get(path);
        if (indexList == null) {
            Matcher mIndex = patternIndex.matcher(path);
            indexList = new ArrayList<String>();
            while (mIndex.find()) {
                indexList.add(mIndex.group(1));
            }
            indexListCache.put(path, indexList);
        }
        if (indexList.size() > 0) {
            json = extractJsonWithIndex(json, indexList);
        }
        return json;
    }
    private AddingList jsonList = new AddingList();
    private static class AddingList extends ArrayList<Object> {
        @Override
        public java.util.Iterator<Object> iterator() {
            return Iterators.forArray(toArray());
        }
        @Override
        public void removeRange(int fromIndex, int toIndex) {
            super.removeRange(fromIndex, toIndex);
        }
    }
    protected Object extractJsonWithIndex(Object json, ArrayList<String> indexList) {
        jsonList.clear();
        jsonList.add(json);
        AddingList tempJsonList = new AddingList();
        for (String index : indexList) {
            int targets = jsonList.size();
            if (index.equalsIgnoreCase("*")) {
                for (Object array : jsonList) {
                    if (array instanceof List) {
                        for (int j = 0; j < ((List<Object>) array).size(); j++) {
                            jsonList.add(((List<Object>) array).get(j));
                        }
                    }
                }
            } else {
                for (Object array : jsonList) {
                    int indexValue = Integer.parseInt(index);
                    if (!(array instanceof List)) {
                        continue;
                    }
                    List<Object> list = (List<Object>) array;
                    if (indexValue >= list.size()) {
                        continue;
                    }
                    tempJsonList.add(list.get(indexValue));
                }
                jsonList.addAll(tempJsonList);
            }
            if (jsonList.size() == targets) {
                return null;
            }
            jsonList.removeRange(0, targets);
        }
        if (jsonList.isEmpty()) {
            return null;
        }
        return (jsonList.size() > 1) ? new ArrayList<Object>(jsonList) : jsonList.get(0);
    }
    protected Object extractJsonWithkey(Object json, String path) {
        if (json instanceof List) {
            List<Object> jsonArray = new ArrayList<Object>();
            for (int i = 0; i < ((List<Object>) json).size(); i++) {
                Object jsonElem = ((List<Object>) json).get(i);
                Object jsonObj = null;
                if (jsonElem instanceof Map) {
                    jsonObj = ((Map<String, Object>) jsonElem).get(path);
                } else {
                    continue;
                }
                if (jsonObj instanceof List) {
                    for (int j = 0; j < ((List<Object>) jsonObj).size(); j++) {
                        jsonArray.add(((List<Object>) jsonObj).get(j));
                    }
                } else if (jsonObj != null) {
                    jsonArray.add(jsonObj);
                }
            }
            return (jsonArray.size() == 0) ? null : jsonArray;
        } else if (json instanceof Map) {
            return ((Map<String, Object>) json).get(path);
        } else {
            return null;
        }
    }
}

   
   

3. 编写测试类实现

package TestKudu;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.JSON_EXISTS;
/**
 * @author 
 * @date 2020/3/19 16:57
 * @explain
 */
public class KafkaSinkTest {
    public static void main(String[] args) throws Exception{
        // 初始化 flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
//        StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(env, fsSettings);
        // 生成数据源
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", ":9092");
        properties.setProperty("group.id", "test1");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("user_behavior1", new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();
        DataStream<String> stream = env
                .addSource(consumer);
         fsTableEnv.registerDataStream("table1",stream,"a");
//         fsTableEnv.registerFunction("testlength",new StringLengthUdf());
        fsTableEnv.registerFunction("json_value1",new GetJsonObject());
//        Table query = fsTableEnv.sqlQuery("select testlength(a) from table1");
        //{"name":"Liza", "password":"123"}
        Table query = fsTableEnv.sqlQuery("select json_value1(a,'$.password') from table1");
        DataStream<Row> dsRow = fsTableEnv.toAppendStream(query, Row.class);
        dsRow.print();
//
        env.execute("sink-test");
    }
}

   
   
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐