By using this site, you agree to the Privacy Policy and Terms of Use.
Accept
World of SoftwareWorld of SoftwareWorld of Software
  • News
  • Software
  • Mobile
  • Computing
  • Gaming
  • Videos
  • More
    • Gadget
    • Web Stories
    • Trending
    • Press Release
Search
  • Privacy
  • Terms
  • Advertise
  • Contact
Copyright © All Rights Reserved. World of Software.
Reading: Handling JSON, Schema Issues, and Doris Quirks in an Apache SeaTunnel Pipeline | HackerNoon
Share
Sign In
Notification Show More
Font ResizerAa
World of SoftwareWorld of Software
Font ResizerAa
  • Software
  • Mobile
  • Computing
  • Gadget
  • Gaming
  • Videos
Search
  • News
  • Software
  • Mobile
  • Computing
  • Gaming
  • Videos
  • More
    • Gadget
    • Web Stories
    • Trending
    • Press Release
Have an existing account? Sign In
Follow US
  • Privacy
  • Terms
  • Advertise
  • Contact
Copyright © All Rights Reserved. World of Software.
World of Software > Computing > Handling JSON, Schema Issues, and Doris Quirks in an Apache SeaTunnel Pipeline | HackerNoon
Computing

Handling JSON, Schema Issues, and Doris Quirks in an Apache SeaTunnel Pipeline | HackerNoon

News Room
Last updated: 2025/04/05 at 2:44 AM
News Room Published 5 April 2025
Share
SHARE

Due to the need to integrate data from different data sources into our data warehouse for our projects, we chose Apache SeaTunnel among many options (Comparison Reference).

Currently, the interface we are using does not require authentication. If authentication is needed in the future, we will discuss and test that as well.

Actual Usage

Apache SeaTunnel Version: 2.3.4

Without further ado, here is the final configuration file. Since I am using thejson``rest-apisubmission method, the result is shown below:

The difference between usingrestandconflies in the job execution environment. TheconfusesClientJobExecutionEnvironment(also supports JSON format upon testing), while therestusesRestJobExecutionEnvironment.

Data Format Returned by the Interface

{
  "code": "0000",
  "msg": "Success",
  "data": {
    "records": [
      {
        "id": "1798895733824393218",
        "taskContent": "License02",
        "taskType": "License"
      }
    ]
  }
}
// The actual data is paginated; the above is a sample.

Integration Configuration

{
  "env": {
    "job.mode": "BATCH",
    "job.name": "SeaTunnel_Job"
  },
  "source": [
    {
      "result_table_name": "Table13367210156032",
      "plugin_name": "Http",
      "url": "http://*.*.*.*:*/day_plan_repair/page",
      "method": "GET",  
      "format": "json",
      "json_field": {   
        "id": "$.data.records[*].id",
        "taskContent": "$.data.records[*].taskContent",
        "taskType": "$.data.records[*].taskType"
      },
      // "pageing": {
      //   "page_field": "current", 
      //   "batch_size": 10 
      // },
      "schema": {
        "fields": {
          "id": "BIGINT",
          "taskContent": "STRING",
          "taskType": "STRING"
        }
      }
    }
  ],
  "transform": [
    {
      "field_mapper": {
        "id": "id", 
        "taskContent": "task_content",
        "taskType": "task_type"
      },
      "result_table_name": "Table13367210156033",
      "source_table_name": "Table13367210156032",
      "plugin_name": "FieldMapper"
    }
  ],
  "sink": [
    {
      "source_table_name": "Table13367210156033",
      "plugin_name": "Doris",
      "fenodes ": "*.*.*.*:*",
      "database": "test",
      "password": "****",
      "username": "****",
      "table": "ods_day_plan",
      "sink.label-prefix": "test-ods_day_plan",
      "sink.enable-2pc": false,
      "data_save_mode": "APPEND_DATA",
      "schema_save_mode": "CREATE_SCHEMA_WHEN_NOT_EXIST",
      "save_mode_create_template": "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP\n UNIQUE KEY (id)\n DISTRIBUTED BY HASH (id)\n PROPERTIES (\n \"replication_allocation\" = \"tag.location.default: 1\",\n \"in_memory\" = \"false\",\n  \"storage_format\" = \"V2\",\n \"disable_auto_compaction\" = \"false\"\n )",
      "sink.enable-delete": true,
      "doris.config": {
        "format": "json",
        "read_json_by_line": "true"
      }
    }
  ]
}

Issues Encountered During Usage

Handle Save Mode Failed

Caused by: java.sql.SQLException: errCode = 2, detailMessage = Syntax error in line 21:
 UNIQUE KEY ()
             ^
Encountered: )
Expected: IDENTIFIER

Solution: See the link [issue](https://github.com/apache/seatunnel/issues/6646)

This issue was resolved by using the `save_mode_create_template` field in the configuration file, which can be customized according to business needs.

NoSuchMethodError

java.lang.NoSuchMethodError: retrofit2.Retrofit$Builder.client(Lshaded/okhttp3/OkHttpClient;)Lretrofit2/Retrofit$Builder;
	at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:179) ~[connector-influxdb-2.3.4.jar:2.3.4]
	at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:120) ~[connector-influxdb-2.3.4.jar:2.3.4]
	at org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient.getInfluxDB(InfluxDBClient.java:72) ~[connector-influxdb-2.3.4.jar:2.3.4]

When using the InfluxDB connection, I encountered ajar package conflictissue. It was found that there was a version conflict between the `retrofit2` dependency used to create the HTTP connection and the one in the datahub connector. Since I did not use `datahub`, removing the `datahub` connector solved the issue.

Apache Doris BIGINT Type Precision Loss Issue

See thepostfor details.

Configuring Primary Key

When configuring thesave_mode_create_templatefor Doris, the primary key type must be a number or date type.

Theidfield in the source schema configuration is returned as a string type, but it is an all-numeric type generated by the Snowflake algorithm, so theBIGINTtype is used for automatic conversion.

The reason is that theUNIQUE KEYin thesave_mode_create_templatein the sink configuration usesidas the primary key, and Doris requires that the primary key column typemust be a number or date type!!

Personal Experience

  1. When there is only one sink, source, or transform, you can omit the result_table_name and source_table_nameconfiguration items.

  2. Download the source code, modify it, and add log entries to the source code. Package and replace the jar in SeaTunnel runtime to facilitate understanding the code and obtaining the desired results through logs.

  3. Based on the first point, after becoming familiar with the code, secondary development can be carried out. For example, how to handle interfaces requiring token authentication.

  4. Note that the value of the JsonPath in the json_fieldof the source configuration does not support extracting values from complex types in lists (Array<Object> or Map<String, Object>). Consider secondary development to resolve this.

// Example:
{
  "code": "0000",
  "msg": "Success",
  "data": {
    "records": [
      {
        "id": "1798895733824393218",
        "taskContent": "License02",
        "taskType": "License",
        "region_list": [ // This format's region_list cannot be parsed and synced $.data.records[*].region_list[*].id will cause a data and total mismatch error
          {
            "id":"1",
            "name": "11"
          },
          {
            "id":"1",
            "name": "11"
          }
        ]
      }
    ]
  }
}

Testing Code (Using JDK17)

 private static final Option[] DEFAULT_OPTIONS = {
            Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL
    };
    private JsonPath[] jsonPaths;
    private final Configuration jsonConfiguration = Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);

    @Test
    public void test5() {
        String data = """
                {
                    "code": "0000",
                    "msg": "Success",
                    "data": {
                        "records": [
                            {
                                "id": "1798895733824393218",
                                "taskContent": "12312312313"
                            }
                        ]
                    }
                }
                """;
        Map<String, String> map = new HashMap<>();
        map.put("id", "$.data.records[*].id");
        map.put("taskContent", "$.data.records[*].taskContent");
        JsonField jsonField = JsonField.builder().fields(map).build();
        initJsonPath(jsonField);
        data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), jsonField)).toString();
        log.error(data);
    }
    // The following code is from HttpSourceReader
    private void initJsonPath(JsonField jsonField) {
        jsonPaths = new JsonPath[jsonField.getFields().size()];
        for (int index = 0; index < jsonField.getFields().keySet().size(); index++) {
            jsonPaths[index] =
                    JsonPath.compile(
                            jsonField.getFields().values().toArray(new String[] {})[index]);
        }
    }

    private List<Map<String, String>> parseToMap(List<List<String>> datas, JsonField jsonField) {
        List<Map<String, String>> decodeDatas = new ArrayList<>(datas.size());
        String[] keys = jsonField.getFields().keySet().toArray(new String[] {});

        for (List<String> data : datas) {
            Map<String, String> decodeData = new HashMap<>(jsonField.getFields().size());
            final int[] index = {0};
            data.forEach(
                    field -> {
                        decodeData.put(keys[index[0]], field);
                        index[0]++;
                    });
            decodeDatas.add(decodeData);
        }

        return decodeDatas;
    }

    private List<List<String>> decodeJSON(String data) {
        ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data);
        List<List<String>> results = new ArrayList<>(jsonPaths.length);
        for (JsonPath path : jsonPaths) {
            List<String> result = jsonReadContext.read(path);
            results.add(result);
        }
        for (int i = 1; i < results.size(); i++) {
            List<?> result0 = results.get(0);
            List<?> result = results.get(i);
            if (result0.size() != result.size()) {
                throw new HttpConnectorException(
                        HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,
                        String.format(
                                "[%s](%d) and [%s](%d) the number of parsing records is inconsistent.",
                                jsonPaths[0].getPath(),
                                result0.size(),
                                jsonPaths[i].getPath(),
                                result.size()));
            }
        }

        return dataFlip(results);
    }

    private List<List<String>> dataFlip(List<List<String>> results) {

        List<List<String>> datas = new ArrayList<>();
        for (int i = 0; i < results.size(); i++) {
            List<String> result = results.get(i);
            if (i == 0) {
                for (Object o : result) {
                    String val = o == null ? null : o.toString();
                    List<String> row = new ArrayList<>(jsonPaths.length);
                    row.add(val);
                    datas.add(row);
                }
            } else {
                for (int j = 0; j < result.size(); j++) {
                    Object o = result.get(j);
                    String val = o == null ? null : o.toString();
                    List<String> row = datas.get(j);
                    row.add(val);
                }
            }
        }
        return datas;
    }

I hope this experience sharing will be helpful to everyone!

Sign Up For Daily Newsletter

Be keep up! Get the latest breaking news delivered straight to your inbox.
By signing up, you agree to our Terms of Use and acknowledge the data practices in our Privacy Policy. You may unsubscribe at any time.
Share This Article
Facebook Twitter Email Print
Share
What do you think?
Love0
Sad0
Happy0
Sleepy0
Angry0
Dead0
Wink0
Previous Article Cricket finally adds one of its most-requested features — and it’s better than the competition
Next Article Metroid Prime 4 proves the Switch 2 Joy-Con mouse can drive a first-person shooter
Leave a comment

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Stay Connected

248.1k Like
69.1k Follow
134k Pin
54.3k Follow

Latest News

Training Time Comparison: Multi-Token vs. Next-Token Prediction | HackerNoon
Computing
Beloved TV channel shuts down tonight after more than 10 years of broadcasting
News
Amazon slashes iPad mini 7 price by $100 for June
News
Cybercriminals Target AI Users with Malware-Loaded Installers Posing as Popular Tools
Computing

You Might also Like

Computing

Training Time Comparison: Multi-Token vs. Next-Token Prediction | HackerNoon

2 Min Read
Computing

Cybercriminals Target AI Users with Malware-Loaded Installers Posing as Popular Tools

9 Min Read
Computing

Meta Disrupts Influence Ops Targeting Romania, Azerbaijan, and Taiwan with Fake Personas

5 Min Read
Computing

Linux Mint Continues Working On Its libAdwaita Fork, Fingerprint Authentication

1 Min Read
//

World of Software is your one-stop website for the latest tech news and updates, follow us now to get the news that matters to you.

Quick Link

  • Privacy Policy
  • Terms of use
  • Advertise
  • Contact

Topics

  • Computing
  • Software
  • Press Release
  • Trending

Sign Up for Our Newsletter

Subscribe to our newsletter to get our newest articles instantly!

World of SoftwareWorld of Software
Follow US
Copyright © All Rights Reserved. World of Software.
Welcome Back!

Sign in to your account

Lost your password?