Flink 实时数仓(二)【ODS 层开发】

前言

        最近投了不少的实习,也收到不错的反馈,虽然是中小公司偏多,但是毕竟现在这个环境双非进大厂实习可不同当年了。可惜的是学院不放人,无奈啊,遍身罗绮者,不是养蚕人。我累死累活肝了两年了,好不容易找到个不错的实习,可是学院...

        不骂了,没那功夫扯淡。回归主题,实时数仓将来真要是有机会从事的话那真不错,也不枉我当时学Flink的时候花了一整个学期,没暖气没电源的教室跑着三台虚拟机,CPU爆满,每天学俩小时就没电了,但还是花了半个学期学完了。今天开始正式的数仓搭建。

1、ODS 层开发

        在实时数仓这里,当我们把数据采集到 Kafka (topic_log 和 topic_db 主题)的时候,其实 ODS 层的数据存储任务就已经完成了(ODS 层的任务:数据的存储和备份)。接下来我们需要做的就是保持数据的有序:

1.1、Kafka 数据有序

        Kafka 只能保证单分区内有序,并不能保证全局有序。

1.1.1、设置 Kafka 分区默认个数

        这里我们需要设置 Kafka 的分区个数为 4,毕竟实时数仓对数据的吞吐量、并发性能的要求是比较高的,所以我们不能为了数据的有序性而把数据到挤到一个分区中:

vim /opt/module/kafka_2.12-3.4.1/config/server.properties
// 修改配置
num.partition=4

1.1.2、设置 Flink 精准一次

        Flink 程序从 Kafka 消费数据时会启动同属于一个消费者组的四个消费者,Kafka 消费者的默认分区分配策略是 Range + CooperativeSticky,消费者数和分区数相同时,每个消费者消费一个分区的数据。只要单分区数据有序,即可保证 Flink 单个并行度数据有序。

        我们这里的 Kafka 版本是 3.0.0,在 Kafka 1.x 及之后的版本中,保证数据单分区有序,条件如下:

不开启 Kafka 幂等性的情况

max.in.flight.requests.per.connection=1

开启 Kafka 幂等性的情况: 

// 必须小于等于5
max.in.flight.requests.per.connection=5

        在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。

        默认情况下幂等性是开启的,max.in.flight.requests.per.connection 默认值为 5,所以单分区数据默认是有序的,不需要做任何配置。

综上,我们可以保证 Flink 程序单个并行度的数据有序。

1.2、Maxwell 同步历史维度数据

        我们的实时计算不需要考虑历史的事实数据(比如下单、加购),但要考虑历史维度数据,毕竟没有维度只有业务过程是没法进行计算的。

1.2.1、为什么要同步历史维度数据而不同步历史业务数据

        当一个用户进行下单这个业务过程的时候,比如 user_id 为 1001 的用户在 province_id 为 17 的省份下单了 sku_id 为 10 的商品,并使用了 cupon_id 为 1001 的优惠券。

        当这个数据传到我们实时数仓的时候,我们必须知道用户是谁,它买了什么东西,有没有使用优惠券、使用了什么类型的优惠券、这个省份 id 是什么地方、下单方式是什么。这就涉及到了很多维度数据,所以我们必须提前把维度数据准备好,等到数据来的时候直接拿来用而不是才去业务库同步。

        在 ODS 层这里我们只需要原封不动的把维度数据导入到 Kafka ,等到搭建 DIM 层的时候直接从 ODS 层拿,而不是让 DIM 层去业务数据库中同步。

        在我们这个项目中,我们需要通过 Maxwell 同步下面这些维度表到 Kafka 的 

activity_info
activity_rule
activity_sku
base_category1
base_category2
base_category3
base_province
base_region
base_trademark
coupon_info
coupon_range
financial_sku_cost
sku_info
spu_info
user_info

编写同步脚本:

#!/bin/bash

# 该脚本的作用是初始化所有的业务数据,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
    for tab in $@
    do
      $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table ${tab} --config $MAXWELL_HOME/config.properties
    done
}

case $1 in
activity_info | activity_rule | activity_sku | base_category1 | base_category2 | base_category3 | base_dic | base_province | base_region | base_trademark | coupon_info | coupon_range | financial_sku_cost | sku_info | spu_info | user_info)
  import_data $1 
  ;;
"all")
  import_data activity_info activity_rule activity_sku base_category1 base_category2 base_category3 base_dic base_province base_region base_trademark coupon_info coupon_range financial_sku_cost sku_info spu_info user_info
  ;;
esac

思考

        我们之前在学习离线数仓的时候, 使用 Maxwell 和 DataX 来同步业务数据,其中的 Maxwell 在离线数仓中其实并没有什么作用,至于削峰解耦在离线数仓中是根本不用考虑的。而如果不使用 Kafka ,我们可以直接通过 Flume 直接采集到 HDFS。

        在离线数仓中使用 Maxwell 的作用完全是为了现在学习实时数仓时,方便 Flink 来直接从 Kafka 去读取数据。但是 Flume 的数据中包含的 Event Header ,它对于实时数仓来说是完全没有用的,所以我们当时为了妥协实时数仓,就把 Flume 数据中的 Header 给去掉了,但是也就引入了零点漂移的问题,毕竟 Event Header 中保存着 timestamp 信息,而它在经过 Kafka 之后,会被 Kafka 给它添加一个 Header ,Header 中的 timestamp 时间默认为 Kafka 处理的时间,所以我们当时又设置了 Flume 拦截器来把 Header 中的 timestamp 值设置为 body 中的时间戳(因为拦截器只能设置在 Source 和 Channel 之间,所以还需要一个 Flume 再从 Kafka 读出来)。

Flume 拦截器代码

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TimeStampInterceptor implements Interceptor {

    private ArrayList<Event> events = new ArrayList<>();

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        events.clear();
        for (Event event : list) {
            events.add(intercept(event));
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

总结

        实时数仓中 ODS 层的工作很简单,我们只需要用 Maxwell 把业务数据库中的维度表进行实时同步即可。至于服务器里的日志数据我们根本不需要实时处理!一个日志数据没必要实时处理!所以实时数仓中我们也就不需要用 Flume 这个工具。

        所以对于实时数仓的 ODS 层,我们主要用的就是 Maxwell 来同步维度数据,而对于事实数据(比如下单、加购),等到 DWD 层再进行处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/579454.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《架构风清扬-Java面试系列第25讲》聊聊ArrayBlockingQueue的特点及使用场景

ArrayBlockingQueue是BlockingQueue接口的一个实现类之一 这个属于基础性问题&#xff0c;老规矩&#xff0c;我们将从使用场景和代码示例来进行讲解 来&#xff0c;思考片刻&#xff0c;给出你的答案 1&#xff0c;使用场景 实现&#xff1a;基于数组实现的有界阻塞队列&…

TCP/IP协议族中的TCP(二):解析其关键特性与机制

⭐小白苦学IT的博客主页⭐ ⭐初学者必看&#xff1a;Linux操作系统入门⭐ ⭐代码仓库&#xff1a;Linux代码仓库⭐ ❤关注我一起讨论和学习Linux系统 滑动窗口 在前面我们讨论了确认应答策略, 对每一个发送的数据段, 都要给一个ACK确认应答. 收到ACK后再发送下一个数据段.这样…

【Python】#5 基础文件IO详解

文章目录 一、文件概述二、文件操作1.文件的打开与关闭2. 文件的读写2.1 读取2.2 写入tips:CSV与JSON文件 一些文件操作小实验《清明》文本写入与读取《红楼梦》人物出现统计&#xff08;部分文本&#xff09; 一、文件概述 文件是数据的集合和抽象&#xff0c;类似&#xff0…

如何增强交友、婚恋平台、金融等平台的安全性

运营商二要素核验是一种数字身份验证方法&#xff0c;主要使用用户的手机号码和姓名作为核验要素。这两个要素被认为是最基本的用户身份信息&#xff0c;通过运营商的数据库来核实其真实性。 在实际操作中&#xff0c;用户需要提供手机号码和姓名进行验证。应用系统会调用接口…

全面了解俄罗斯的VK开户和Yandex投放及内容运营

俄罗斯的VKontakte&#xff08;简称VK&#xff09;和Yandex是两个重要的在线平台&#xff0c;对于希望在俄罗斯市场进行推广的企业来说&#xff0c;了解如何在这些平台上开户和投放广告以及内容运营是非常关键的。 俄罗斯vk广告如何开户&#xff1f; 通过上海上弦进行俄罗斯V…

手写一个RNN前向传播以及反向传播

前向传播 根据公式 st tanh (Uxt Wst-1 ba) ot softmax(Vst by ) m 3 词的个数 n 5 import numpy as np import tensorflow as tf # 单个cell 的前向传播过程 # 两个输入&#xff0c;x_t&#xff0c;s_prev,parameters def rnn_cell_forward(x_t,s_prev,parameter…

每日OJ题_DFS回溯剪枝⑧_力扣494. 目标和

目录 力扣494. 目标和 解析代码&#xff08;path设置成全局&#xff09; 解析代码&#xff08;path设置全局&#xff09; 力扣494. 目标和 494. 目标和 难度 中等 给你一个非负整数数组 nums 和一个整数 target 。 向数组中的每个整数前添加 或 - &#xff0c;然后串联…

SpringBoot + Vue实现Github第三方登录

前言&#xff1a;毕业设计终于好了&#xff0c;希望能有空多写几篇 1. 获取Github账号的Client ID和Client secrets 首先点击这个链接进入Github的OAuth Apps页面&#xff0c;页面展示如下&#xff1a; 之后我们可以创建一个新的apps: 填写资料&#xff1a; 创建之后就可以获…

WebGIS面试题(第六期)-GeoServer

WebGIS面试题&#xff08;第六期&#xff09; 以下题目仅为部分题目&#xff0c;全部题目在公众号 {GISer世界} &#xff0c;答案仅供参考!!! 因为本人之前做过相关项目用到了GeoServer&#xff0c;因此在简历上写了熟悉GeoServer。所以在相关面试中都有问到&#xff0c;所以我…

【项目】仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器(Http板块)

【项目】仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器&#xff08;Http板块&#xff09; 一、思路图二、Util板块1、Splite板块&#xff08;分词&#xff09;&#xff08;1&#xff09;代码&#xff08;2&#xff09;测试及测试结果i、第一种测试ii、第二种…

[论文阅读] 3D感知相关论文简单摘要

Adaptive Fusion of Single-View and Multi-View Depth for Autonomous Driving 提出了一个单、多视图融合深度估计系统&#xff0c;它自适应地集成了高置信度的单视图和多视图结果 动态选择两个分支之间的高置信度区域执行融合 提出了一个双分支网络&#xff0c;即一个以单…

查看笔记本电池容量/健康状态

1. 打开命令行提示符 快捷键“win R”后输入“cmd” 2. 在命令提示符中输入命令 “powercfg /batteryreport" 并回车 3. 查看文件 最后就可以看到笔记本的电池使用报告了

Promises: JavaScript异步编程的救星

Promises: JavaScript异步编程的救星 Promises&#xff08;承诺&#xff09;是JavaScript中处理异步操作的一种机制&#xff0c;它提供了一种更优雅和可读性更高的方式来处理异步代码。Promises的实现原理基于一种称为"Promise/A"规范的约定&#xff0c;该规范定义了…

[蓝桥杯2024]-Reverse:rc4解析(对称密码rc4)

无壳 查看ida 这里应该运行就可以得flag&#xff0c;但是这个程序不能直接点击运行 按照伪代码写exp 完整exp&#xff1a; keylist(gamelab) content[0xB6,0x42,0xB7,0xFC,0xF0,0xA2,0x5E,0xA9,0x3D,0x29,0x36,0x1F,0x54,0x29,0x72,0xA8, 0x63,0x32,0xF2,0x44,0x8B,0x85,0x…

visual studio2022,开发CMake项目添加rabbitmq库,连接到远程计算机并进行开发于调试

1.打开visual studio installer 。安装“用于 Windows 的 C CMake 工具” 2.新建CMake项目 3.点击VS的“工具”—>"选项“—>“跨平台”—>”连接管理器“,添加远程计算机。用来将VS编辑的代码传到服务器进行编译–连接—运行&#xff08;调试&#xff09;。 …

BIO、NIO与AIO

一 BIO 同步并阻塞(传统阻塞型)&#xff0c;服务器实现模式为一个连接一个线程&#xff0c;即客户端有连接请求时服务器端就需要启动一个线程进行处理. BIO&#xff08;Blocking I/O&#xff0c;阻塞I/O&#xff09;模式是一种网络编程中的I/O处理模式。在BIO模式中&#xf…

鸿蒙内核源码分析(任务调度篇) | 任务是内核调度的单元

任务即线程 在鸿蒙内核中&#xff0c;广义上可理解为一个任务就是一个线程 官方是怎么描述线程的 基本概念 从系统的角度看&#xff0c;线程是竞争系统资源的最小运行单元。线程可以使用或等待CPU、使用内存空间等系统资源&#xff0c;并独立于其它线程运行。 鸿蒙内核每个…

[蓝桥杯2024]-PWN:fd解析(命令符转义,标准输出重定向)

查看保护 查看ida 这里有一次栈溢出&#xff0c;并且题目给了我们system函数。 这里的知识点没有那么复杂 完整exp&#xff1a; from pwn import* pprocess(./pwn) pop_rdi0x400933 info0x601090 system0x400778payloadb"ca\\t flag 1>&2" print(len(paylo…

SAP PP学习笔记07 - 作业手顺(工艺路线Routing)

上一章讲了BOM的相关知识。 SAP PP学习笔记07 - 简单BOM&#xff0c;派生BOM&#xff0c;多重BOM&#xff0c;批量修改工具 CEWB_sap半成品有多个bom-CSDN博客 本章来讲作业手顺&#xff08;工艺路线Routing&#xff09;的相关知识。 1&#xff0c;作业手顺(工艺路线 Routing…

四、线段、矩形、圆、椭圆、自定义多边形、边缘轮廓和文本绘制(OpenCvSharp)

功能实现&#xff1a; 对指定图片上进行绘制线段、矩形、圆、椭圆、自定义多边形、边缘轮廓以及自定义文本 一、布局 用到了一个pictureBox和八个button 二、引入命名空间 using System; using System.Collections.Generic; using System.Drawing; using System.Windows.F…
最新文章