starrock通过导入实现数据变更

当数据文件中同时包含 UPSERT 和 DELETE 操作时,必须添加 __op 字段,并且确保数据文件中包含一个代表操作类型的列,取值为 0 或 1。其中,取值为 0 时代表 UPSERT 操作,取值为 1 时代表 DELETE 操作。

数据样例

  1. 准备数据文件。

    a. 在本地文件系统创建一个 CSV 格式的数据文件 example3.csv。文件包含四列,分别代表用户 ID、用户姓名、用户得分和操作类型,如下所示:

    101,Tom,100,1
    102,Sam,70,0
    103,Stan,80,0

    b. 把 example3.csv 文件中的数据上传到 Kafka 集群的 topic3 中。

  2. 准备 StarRocks 表。

    a. 在数据库 test_db 中创建一张名为 table3 的主键模型表。表包含 idname 和 score 三列,分别代表用户 ID、用户名称和用户得分,主键为 id 列,如下所示:

    CREATE TABLE `table3`
    (
    `id` int(11) NOT NULL COMMENT "用户 ID",
    `name` varchar(65533) NOT NULL COMMENT "用户姓名",
    `score` int(11) NOT NULL COMMENT "用户得分"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

    说明

    自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 确定分桶数量。

    b. 向 table3 表中插入数据,如下所示:

    INSERT INTO table3 VALUES
    (101, 'Tom', 100),
    (102, 'Sam', 90);

导入数据

通过导入,把 example3.csv 文件中 id 为 101 的数据从 table3 表中删除,把 example3.csv 文件中 id 为 102 的数据更新到 table3 表,并且把 example3.csv 文件中 id 为 103 的数据插入到 table3 表:

  • 通过 Stream Load 导入:

    curl --location-trusted -u <username>:<password> \
    -H "Expect:100-continue" \
    -H "label:label4" \
    -H "column_separator:," \
    -H "columns: id, name, score, temp, __op = temp" \
    -T example3.csv -XPUT \
    http://<fe_host>:<fe_http_port>/api/test_db/table3/_stream_load

    说明

    上述示例中,通过 columns 参数把 example3.csv 文件中代表组别代码的第四列临时命名为 temp,然后定义 __op 字段等于临时命名的 temp 列。这样,StarRocks 可以根据 example3.csv 文件中第四列的取值是 0 还是 1 来确定执行 UPSERT 还是 DELETE 操作。

代码实现

数据转成JSON,自动增加一个是否删除标志

  private String toJsonString(TapTable tapTable, Map<String, Object> record, boolean delete) throws JsonProcessingException {    if (null == tapTable) throw new IllegalArgumentException("TapTable cannot be null");    if (null == record) throw new IllegalArgumentException("Record cannot be null");    LinkedHashMap<String, Object> linkedRecord = new LinkedHashMap<>();    for (String field : tapTable.getNameFieldMap().keySet()) {      Object value = record.get(field);      if (null == value) {        linkedRecord.put(field, null);      } else {        linkedRecord.put(field, value.toString());      }    }    linkedRecord.put(Constants.STARROCKS_DELETE_SIGN, delete ? 1 : 0);    return objectMapper.writeValueAsString(linkedRecord);  }

stream load导入

public RespContent put(final TapTable table) throws StreamLoadException, StarRocksRetryableException {    StarRocksConfig config = starRocksContext.getStarRocksConfig();    StarRocksContext.WriteFormat writeFormat = starRocksContext.getWriteFormat();    String loadUrl = null;    try {      String[] httpNodes = config.getStarRocksHttp().split(",");      loadUrl = buildLoadUrl(httpNodes[new Random().nextInt(httpNodes.length)], config.getDatabase(), table.getId());      TapLogger.info("starrocks-load: loadUrl = {}", loadUrl);      final String prefix = buildPrefix(table.getId());
String label = prefix + "-" + UUID.randomUUID(); List<String> columns = new ArrayList<>(); for (Map.Entry<String, TapField> entry : table.getNameFieldMap().entrySet()) { columns.add(entry.getKey()); } // add the STARROCKS_DELETE_SIGN at the end of the column columns.add(Constants.STARROCKS_DELETE_SIGN); columns.add("__op = "+Constants.STARROCKS_DELETE_SIGN); HttpPutBuilder putBuilder = new HttpPutBuilder(); InputStreamEntity entity = new InputStreamEntity(recordStream, recordStream.getContentLength()); Collection<String> primaryKeys = table.primaryKeys(true); if (CollectionUtils.isEmpty(primaryKeys)) { putBuilder.setUrl(loadUrl) // 前端表单传出来的值和tdd json加载的值可能有差别,如前端传的pwd可能是null,tdd的是空字符串 .baseAuth(config.getUser(), config.getPassword()) .addCommonHeader() .addFormat(writeFormat) .addColumns(columns) .setLabel(label) .enableAppend() .setEntity(entity); } else { putBuilder.setUrl(loadUrl) // 前端表单传出来的值和tdd json加载的值可能有差别,如前端传的pwd可能是null,tdd的是空字符串 .baseAuth(config.getUser(), config.getPassword()) .addCommonHeader() .addFormat(writeFormat) .addColumns(columns) .setLabel(label) .enableDelete() .setEntity(entity); } HttpPut httpPut = putBuilder.build(); TapLogger.debug(TAG, "Call stream load http api, url: {}, headers: {}", loadUrl, putBuilder.header); return handlePreCommitResponse(httpClient.execute(httpPut)); } catch (StarRocksRetryableException e) { metrics.clear(); throw e; } catch (Exception e) { throw new StreamLoadException(String.format("Call stream load error: %s", e.getMessage()), e); } }


相关推荐

  • 1024 程序员节全体大会正式开幕:院士、技术英雄齐聚,看开源如何启动 AI 新纪元!
  • 华为辟谣将发射 1 万枚 6G 移动低轨卫星;雷军称新十年成为全球硬核科技引领者;首个软件专利获得者马丁·格茨离世|极客头条
  • 涨停在吃35%落袋,这3之只加入自选
  • 脱发秘籍:前端Chrome调试技巧最全汇总
  • uni-app x 来了 !!!
  • 找对方法,Rust 也可以很简单
  • JavaScript 实在太烂了!!!
  • AI时代下的管理变革
  • 你知道 Python 其实自带了小型数据库吗
  • MyBatis-Plus实现逻辑删除[MyBatis-Plus系列] - 492篇
  • Apollo,真香!
  • 双11要来了,推荐7款国产AI商拍工具,助你快速生成优质商品图
  • 分享 20 个提升效率的 JavaScript 缩写小技巧
  • 交互式数据可视化图表库 ApexCharts
  • 不用 SQL 的开源数据仓库
  • P0 故障,语雀全线宕机八小时
  • 在视觉提示中加入「标记」,微软等让GPT-4V看的更准、分的更细
  • 陶哲轩疯狂安利Copilot:它帮我完成了一页纸证明,甚至能猜出我后面的过程
  • Kaggle赛题总结:Bengali 语音识别
  • Kaggle LLM Science Exam 金牌方案汇总。