前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用 Kafka、Spark、Airflow 和 Docker 构建数据流管道指南

用 Kafka、Spark、Airflow 和 Docker 构建数据流管道指南

作者头像
大数据杂货铺
发布2023-11-27 17:17:11
6220
发布2023-11-27 17:17:11
举报
文章被收录于专栏:大数据杂货铺大数据杂货铺

在本指南中,我们将深入探讨构建强大的数据管道,用 Kafka 进行数据流处理、Spark 进行处理、Airflow 进行编排、Docker 进行容器化、S3 进行存储,Python 作为主要脚本语言。

为了说明这个过程,我们将使用 Random Name API,这是一个多功能工具,每次触发都会生成新的随机数据。它提供了许多企业日常处理实时数据的实用表示。我们第一步涉及一个 Python 脚本,该脚本经过精心设计,用于从该 API 获取数据。为了模拟数据的流式传输性质,我们将定期执行此脚本。这个脚本还将充当我们与 Kafka 的桥梁,将获取的数据直接写入 Kafka 主题。

随着我们的深入,Airflow 的有向无环图 (DAG) 发挥着关键作用。Airflow DAG 脚本编排我们的流程,确保我们的 Python 脚本像时钟一样运行,持续流式传输数据并将其输入到我们的管道中。一旦我们的数据到达 Kafka producer,Spark Structured Streaming 就会接过接力棒。使用这些数据,对其进行处理,然后将修改后的数据无缝写入 S3,确保其为后续分析过程做好准备。

项目的一个重要方面是其模块化架构。得益于 Docker 容器,每个服务,无论是 Kafka、Spark 还是 Airflow,都在隔离的环境中运行。不仅确保了平滑的互操作性,还简化了可扩展性和调试。

入门:先决条件和设置

对于这个项目,我们利用GitHub存储库来托管我们的整个设置,使任何人都可以轻松开始。

A、Docker:Docker 将成为我们编排和运行各种服务的主要工具。

  • 安装:访问 Docker 官方网站,下载并安装适合您操作系统的 Docker Desktop。
  • 验证:打开终端或命令提示符并执行 docker --version 以确保安装成功。

B、S3:AWS S3 是我们数据存储的首选。

  • 设置:登录 AWS 管理控制台,导航到 S3 服务,然后建立一个新存储桶,确保根据您的数据存储首选项对其进行配置。

C、设置项目:

  • 克隆存储库:首先,您需要使用以下命令从 GitHub 存储库克隆项目:
代码语言:javascript
复制
git clone <https://github.com/simardeep1792/Data-Engineering-Streaming-Project.git>
  • 导航到项目目录:
代码语言:javascript
复制
cd Data-Engineering-Streaming-Project
代码语言:javascript
复制
使用以下方式部署服务docker-compose:在项目目录中,您将找到一个

docker-compose.yml文件。该文件描述了所有服务。

docker network create docker_streaming
docker-compose -f docker-compose.yml up -d
该命令协调 Docker 容器中所有必要服务的启动,例如 Kafka、Spark、Airflow 等。

分解项目文件

1、docker-compose.yml

代码语言:javascript
复制
version: '3.7'

services:
  # Airflow PostgreSQL Database
  airflow_db:
    image: postgres:16.0
    environment:
      - POSTGRES_USER=${POSTGRES_USER}
      - POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
      - POSTGRES_DB=${POSTGRES_DB}
    logging:
      options:
        max-size: 10m
        max-file: "3"

  # Apache Airflow Webserver
  airflow_webserver:
    command: bash -c "airflow db init && airflow webserver && airflow users  create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin"
    image: apache/airflow:latest
    restart: always
    depends_on:
      - airflow_db
    environment:
      - LOAD_EX=${LOAD_EX}
      - EXECUTOR=${EXECUTOR}
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@airflow_db:5432/${POSTGRES_DB}
    logging:
      options:
        max-size: 10m
        max-file: "3"
    volumes:
      - ./dags:/opt/airflow/dags
      - ./requirements.txt:/opt/airflow/requirements.txt
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

  # Zookeeper for Kafka
  kafka_zookeeper:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - "2181:2181"
    environment:
      - ZOOKEEPER_CLIENT_PORT=${ZOOKEEPER_CLIENT_PORT}
      - ZOOKEEPER_SERVER_ID=${ZOOKEEPER_SERVER_ID}
      - ZOOKEEPER_SERVERS=kafka_zookeeper:2888:3888
    networks:
      - kafka_network
      - default

  # Kafka Broker Instances
  kafka_broker_1:
    extends:
      service: kafka_base
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092

  kafka_broker_2:
    extends:
      service: kafka_base
    environment:
      - KAFKA_BROKER_ID=2
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093

  kafka_broker_3:
    extends:
      service: kafka_base
    environment:
      - KAFKA_BROKER_ID=3
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094

  kafka_base:
    image: confluentinc/cp-kafka:latest
    environment:
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}
      - KAFKA_INTER_BROKER_LISTENER_NAME=${KAFKA_INTER_BROKER_LISTENER_NAME}
      - KAFKA_ZOOKEEPER_CONNECT=kafka_zookeeper:2181
      - KAFKA_LOG4J_LOGGERS=${KAFKA_LOG4J_LOGGERS}
      - KAFKA_AUTHORIZER_CLASS_NAME=${KAFKA_AUTHORIZER_CLASS_NAME}
      - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=${KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND}
    networks:
      - kafka_network
      - default

  # Kafka Connect
  kafka_connect:
    image: confluentinc/cp-kafka-connect:latest
    ports:
      - "8083:8083"
    environment:
      - CONNECT_BOOTSTRAP_SERVERS=${CONNECT_BOOTSTRAP_SERVERS}
      - CONNECT_REST_PORT=${CONNECT_REST_PORT}
      - CONNECT_GROUP_ID=${CONNECT_GROUP_ID}
      - CONNECT_CONFIG_STORAGE_TOPIC=${CONNECT_CONFIG_STORAGE_TOPIC}
      - CONNECT_OFFSET_STORAGE_TOPIC=${CONNECT_OFFSET_STORAGE_TOPIC}
      - CONNECT_STATUS_STORAGE_TOPIC=${CONNECT_STATUS_STORAGE_TOPIC}
      - CONNECT_KEY_CONVERTER=${CONNECT_KEY_CONVERTER}
      - CONNECT_VALUE_CONVERTER=${CONNECT_VALUE_CONVERTER}
      - CONNECT_INTERNAL_KEY_CONVERTER=${CONNECT_INTERNAL_KEY_CONVERTER}
      - CONNECT_INTERNAL_VALUE_CONVERTER=${CONNECT_INTERNAL_VALUE_CONVERTER}
      - CONNECT_REST_ADVERTISED_HOST_NAME=${CONNECT_REST_ADVERTISED_HOST_NAME}
      - CONNECT_LOG4J_ROOT_LOGLEVEL=${CONNECT_LOG4J_ROOT_LOGLEVEL}
      - CONNECT_LOG4J_LOGGERS=${CONNECT_LOG4J_LOGGERS}
      - CONNECT_PLUGIN_PATH=${CONNECT_PLUGIN_PATH}
    networks:
      - kafka_network
      - default

  # Kafka Schema Registry
  kafka_schema_registry:
    image: confluentinc/cp-schema-registry:latest
    ports:
      - "8081:8081"
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=${SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS}
      - SCHEMA_REGISTRY_HOST_NAME=${SCHEMA_REGISTRY_HOST_NAME}
      - SCHEMA_REGISTRY_LISTENERS=${SCHEMA_REGISTRY_LISTENERS}
    networks:
      - kafka_network
      - default

  # Kafka User Interface
  kafka_ui:
    container_name: kafka-ui-1
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8888:8080
    depends_on:
      - kafka_broker_1
      - kafka_broker_2
      - kafka_broker_3
      - kafka_schema_registry
      - kafka_connect
    environment:
      - KAFKA_CLUSTERS_0_NAME=${KAFKA_CLUSTERS_0_NAME}
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=${KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS}
      - KAFKA_CLUSTERS_0_SCHEMAREGISTRY=${KAFKA_CLUSTERS_0_SCHEMAREGISTRY}
      - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME=${KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME}
      - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS=${KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS}
      - DYNAMIC_CONFIG_ENABLED=${DYNAMIC_CONFIG_ENABLED}

    networks:
      - kafka_network
      - default

  # Apache Spark Master Node
  spark_master:
    image: bitnami/spark:3
    container_name: spark_master
    ports:
      - 8085:8080
    environment:
      - SPARK_UI_PORT=${SPARK_UI_PORT}
      - SPARK_MODE=${SPARK_MODE}
      - SPARK_RPC_AUTHENTICATION_ENABLED=${SPARK_RPC_AUTHENTICATION_ENABLED}
      - SPARK_RPC_ENCRYPTION_ENABLED=${SPARK_RPC_ENCRYPTION_ENABLED}
    volumes:
      - ./:/home
      - spark_data:/opt/bitnami/spark/data
    networks:
      - default
      - kafka_network

#volumes for data
volumes:
  spark_data:

#network for Kafka
networks:
  kafka_network:
    driver: bridge
  default:
    external:
      name: docker_streaming

项目设置的核心在于文件 docker-compose.yml 。它协调我们的服务,确保顺畅的通信和初始化。这是一个细分:

1)版本

使用 Docker Compose 文件格式版本“3.7”,确保与服务兼容。

2)服务

项目包含多项服务:

  • Airflow:
  • 数据库 ( airflow_db):使用 PostgreSQL 1。
  • Web 服务器 ( airflow_webserver):启动数据库并设置管理员用户。
  • Kafka:
  • Zookeeper ( kafka_zookeeper):管理 broker 元数据。
  • Brokers:三个实例(kafka_broker_1、2 和 3)。
  • 基本配置 ( kafka_base):Broker的常见设置。
  • Kafka Connect(kafka_connect):促进流处理。
  • 架构注册表 ( kafka_schema_registry):管理 Kafka 架构。
  • 用户界面 ( kafka_ui):Kafka 的可视化界面。
  • spark:
  • 主节点 ( spark_master):Apache Spark 的中央控制节点。

3)卷

利用持久卷spark_data来确保 Spark 的数据一致性。

4)网络

服务有两个网络:

  • Kafka Network ( kafka_network):专用于 Kafka。
  • 默认网络 ( default):外部命名为docker_streaming。

2、kafka_stream_dag.py

代码语言:javascript
复制
# Importing required modules
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from kafka_streaming_service import initiate_stream  
# Configuration for the DAG's start date
DAG_START_DATE = datetime(2018, 12, 21, 12, 12)

# Default arguments for the DAG
DAG_DEFAULT_ARGS = {
    'owner': 'airflow',
    'start_date': DAG_START_DATE,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

# Creating the DAG with its configuration
with DAG(
    'name_stream_dag',  # Renamed for uniqueness
    default_args=DAG_DEFAULT_ARGS,
    schedule_interval='0 1 * * *',
    catchup=False,
    description='Stream random names to Kafka topic',
    max_active_runs=1
) as dag:

    # Defining the data streaming task using PythonOperator
    kafka_stream_task = PythonOperator(
        task_id='stream_to_kafka_task', 
        python_callable=initiate_stream,
        dag=dag
    )

    kafka_stream_task

该文件主要定义了一个Airflow Directed Acyclic Graph(DAG),用于处理数据流到Kafka主题。

1)进口

导入基本模块和函数,特别是 Airflow DAG 和 PythonOperator,以及initiate_stream来自kafka_streaming_service.

2)配置

  • DAG 开始日期 ( DAG_START_DATE):设置 DAG 开始执行的时间。
  • 默认参数 ( DAG_DEFAULT_ARGS):配置 DAG 的基本参数,例如所有者、开始日期和重试设置。

3)DAG定义

将创建一个名为 的新 DAG name_stream_dag,配置为每天凌晨 1 点运行。它的设计目的是不运行任何错过的间隔(带有catchup=False),并且一次只允许一次活动运行。

4)任务

单个任务 kafka_stream_task 是使用 PythonOperator 定义的。此任务调用该initiate_stream函数,在 DAG 运行时有效地将数据流式传输到 Kafka。

3、kafka_streaming_service.py

代码语言:javascript
复制
# Importing necessary libraries and modules
import requests
import json
import time
import hashlib
from confluent_kafka import Producer

# Constants and configuration
API_ENDPOINT = "https://randomuser.me/api/?results=1"
KAFKA_BOOTSTRAP_SERVERS = ['kafka_broker_1:19092','kafka_broker_2:19093','kafka_broker_3:19094']
KAFKA_TOPIC = "names_topic"  
PAUSE_INTERVAL = 10  
STREAMING_DURATION = 120

def retrieve_user_data(url=API_ENDPOINT) -> dict:
    """Fetches random user data from the provided API endpoint."""
    response = requests.get(url)
    return response.json()["results"][0]

def transform_user_data(data: dict) -> dict:
    """Formats the fetched user data for Kafka streaming."""
    return {
        "name": f"{data['name']['title']}. {data['name']['first']} {data['name']['last']}",
        "gender": data["gender"],
        "address": f"{data['location']['street']['number']}, {data['location']['street']['name']}",  
        "city": data['location']['city'],
        "nation": data['location']['country'],  
        "zip": encrypt_zip(data['location']['postcode']),  
        "latitude": float(data['location']['coordinates']['latitude']),
        "longitude": float(data['location']['coordinates']['longitude']),
        "email": data["email"]
    }

def encrypt_zip(zip_code):  
    """Hashes the zip code using MD5 and returns its integer representation."""
    zip_str = str(zip_code)
    return int(hashlib.md5(zip_str.encode()).hexdigest(), 16)

def configure_kafka(servers=KAFKA_BOOTSTRAP_SERVERS):
    """Creates and returns a Kafka producer instance."""
    settings = {
        'bootstrap.servers': ','.join(servers),
        'client.id': 'producer_instance'  
    }
    return Producer(settings)

def publish_to_kafka(producer, topic, data):
    """Sends data to a Kafka topic."""
    producer.produce(topic, value=json.dumps(data).encode('utf-8'), callback=delivery_status)
    producer.flush()

def delivery_status(err, msg):
    """Reports the delivery status of the message to Kafka."""
    if err is not None:
        print('Message delivery failed:', err)
    else:
        print('Message delivered to', msg.topic(), '[Partition: {}]'.format(msg.partition()))

def initiate_stream():
    """Initiates the process to stream user data to Kafka."""
    kafka_producer = configure_kafka()
    for _ in range(STREAMING_DURATION // PAUSE_INTERVAL):
        raw_data = retrieve_user_data()
        kafka_formatted_data = transform_user_data(raw_data)
        publish_to_kafka(kafka_producer, KAFKA_TOPIC, kafka_formatted_data)
        time.sleep(PAUSE_INTERVAL)

if __name__ == "__main__":
    initiate_stream()

1)导入和配置

导入基本库并设置常量,例如 API 端点、Kafka 引导服务器、主题名称和流间隔详细信息。

2)用户数据检索

该retrieve_user_data函数从指定的 API 端点获取随机用户详细信息。

3)数据转换

该 transform_user_data 函数格式化用于 Kafka 流的原始用户数据,同时 encrypt_zip 对邮政编码进行哈希处理以维护用户隐私。

4)Kafka 配置与发布

  • configure_kafka 设置 Kafka 生产者。
  • publish_to_kafka 将转换后的用户数据发送到 Kafka 主题。
  • delivery_status 提供有关数据是否成功发送到 Kafka 的反馈。

5)主要流功能

initiate_stream 协调整个流程,定期检索、转换用户数据并将其发布到 Kafka。

6)执行

当直接运行脚本时,initiate_stream 将执行该函数,并在指定的持续时间内流式传输数据 STREAMING_DURATION。

4、spark_processing.py

代码语言:javascript
复制
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType


# Initialize logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
logger = logging.getLogger("spark_structured_streaming")


def initialize_spark_session(app_name, access_key, secret_key):
    """
    Initialize the Spark Session with provided configurations.

    :param app_name: Name of the spark application.
    :param access_key: Access key for S3.
    :param secret_key: Secret key for S3.
    :return: Spark session object or None if there's an error.
    """
    try:
        spark = SparkSession \
                .builder \
                .appName(app_name) \
                .config("spark.hadoop.fs.s3a.access.key", access_key) \
                .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
                .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
                .getOrCreate()

        spark.sparkContext.setLogLevel("ERROR")
        logger.info('Spark session initialized successfully')
        return spark

    except Exception as e:
        logger.error(f"Spark session initialization failed. Error: {e}")
        return None


def get_streaming_dataframe(spark, brokers, topic):
    """
    Get a streaming dataframe from Kafka.

    :param spark: Initialized Spark session.
    :param brokers: Comma-separated list of Kafka brokers.
    :param topic: Kafka topic to subscribe to.
    :return: Dataframe object or None if there's an error.
    """
    try:
        df = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", brokers) \
            .option("subscribe", topic) \
            .option("delimiter", ",") \
            .option("startingOffsets", "earliest") \
            .load()
        logger.info("Streaming dataframe fetched successfully")
        return df

    except Exception as e:
        logger.warning(f"Failed to fetch streaming dataframe. Error: {e}")
        return None


def transform_streaming_data(df):
    """
    Transform the initial dataframe to get the final structure.

    :param df: Initial dataframe with raw data.
    :return: Transformed dataframe.
    """
    schema = StructType([
        StructField("full_name", StringType(), False),
        StructField("gender", StringType(), False),
        StructField("location", StringType(), False),
        StructField("city", StringType(), False),
        StructField("country", StringType(), False),
        StructField("postcode", IntegerType(), False),
        StructField("latitude", FloatType(), False),
        StructField("longitude", FloatType(), False),
        StructField("email", StringType(), False)
    ])

    transformed_df = df.selectExpr("CAST(value AS STRING)") \
        .select(from_json(col("value"), schema).alias("data")) \
        .select("data.*")
    return transformed_df


def initiate_streaming_to_bucket(df, path, checkpoint_location):
    """
    Start streaming the transformed data to the specified S3 bucket in parquet format.

    :param df: Transformed dataframe.
    :param path: S3 bucket path.
    :param checkpoint_location: Checkpoint location for streaming.
    :return: None
    """
    logger.info("Initiating streaming process...")
    stream_query = (df.writeStream
                    .format("parquet")
                    .outputMode("append")
                    .option("path", path)
                    .option("checkpointLocation", checkpoint_location)
                    .start())
    stream_query.awaitTermination()


def main():
    app_name = "SparkStructuredStreamingToS3"
    access_key = "ENTER_YOUR_ACCESS_KEY"
    secret_key = "ENTER_YOUR_SECRET_KEY"
    brokers = "kafka_broker_1:19092,kafka_broker_2:19093,kafka_broker_3:19094"
    topic = "names_topic"
    path = "BUCKET_PATH"
    checkpoint_location = "CHECKPOINT_LOCATION"

    spark = initialize_spark_session(app_name, access_key, secret_key)
    if spark:
        df = get_streaming_dataframe(spark, brokers, topic)
        if df:
            transformed_df = transform_streaming_data(df)
            initiate_streaming_to_bucket(transformed_df, path, checkpoint_location)


# Execute the main function if this script is run as the main module
if __name__ == '__main__':
    main()
代码语言:javascript
复制
1. 导入和日志初始化

导入必要的库,并创建日志记录设置以更好地调试和监控。

2. Spark会话初始化

initialize_spark_session:此函数使用从 S3 访问数据所需的配置来设置 Spark 会话。

3. 数据检索与转换

  • get_streaming_dataframe:从 Kafka 获取具有指定代理和主题详细信息的流数据帧。
  • transform_streaming_data:将原始 Kafka 数据转换为所需的结构化格式。

4. 流式传输到 S3

initiate_streaming_to_bucket:此函数将转换后的数据以 parquet 格式流式传输到 S3 存储桶。它使用检查点机制来确保流式传输期间数据的完整性。

5. 主执行

该 main 函数协调整个过程:初始化 Spark 会话、从 Kafka 获取数据、转换数据并将其流式传输到 S3。

6. 脚本执行

如果脚本是正在运行的主模块,它将执行该 main 函数,启动整个流处理过程。

构建数据管道:逐步

1. 设置Kafka集群

使用以下命令启动 Kafka 集群:

代码语言:javascript
复制
docker network create docker_streaming
docker-compose -f docker-compose.yml up -d
代码语言:javascript
复制
2. 为 Kafka 创建主题(http://localhost:8888/)
  • 通过http://localhost:8888/访问 Kafka UI 。
  • 观察活动集群。
  • 导航至“主题”。
  • 创建一个名为“names_topic”的新主题。
  • 将复制因子设置为 3。

3. 配置 Airflow 用户

创建具有管理员权限的 Airflow 用户:

代码语言:javascript
复制
docker-compose run airflow_webserver airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin

4. 访问 Airflow Bash 并安装依赖项

我们应该将脚本移动kafka_stream_dag.py到文件夹下以便能够运行 DAG 使用提供的脚本访问 Airflow bash 并安装所需的软件包:kafka_streaming_service.py dags

代码语言:javascript
复制
./airflow.sh bash
pip install -r ./requirements.txt
代码语言:javascript
复制

5. 验证 DAG

确保您的 DAG 没有错误:

代码语言:javascript
复制
airflow dags list

6. 启动 Airflow 调度程序

要启动 DAG,请运行调度程序:

代码语言:javascript
复制
airflow scheduler

7. 验证数据是否上传到 Kafka 集群

  • 访问 Kafka UI:http://localhost:8888/并验证该主题的数据是否已上传

8. 传输 Spark 脚本

将 Spark 脚本复制到 Docker 容器中:

代码语言:javascript
复制
docker cp spark_processing.py spark_master:/opt/bitnami/spark/

9.启动 Spark Master 并下载 JAR

访问 Spark bash,导航到jars目录并下载必要的 JAR 文件。下载后,提交Spark作业:

代码语言:javascript
复制
docker exec -it spark_master /bin/bash
cd jars

curl -O <https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.1/kafka-clients-2.8.1.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.13/3.3.0/spark-sql-kafka-0-10_2.13-3.3.0.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar>
curl -O <https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.375/aws-java-sdk-s3-1.11.375.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.8.0/commons-pool2-2.8.0.jar>

cd ..


spark-submit \\
--master local[2] \\
--jars /opt/bitnami/spark/jars/kafka-clients-2.8.1.jar,\\
/opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.13-3.3.0.jar,\\
/opt/bitnami/spark/jars/hadoop-aws-3.2.0.jar,\\
/opt/bitnami/spark/jars/aws-java-sdk-s3-1.11.375.jar,\\
/opt/bitnami/spark/jars/commons-pool2-2.8.0.jar \\
spark_processing.py

10. 验证S3上的数据

执行这些步骤后,检查您的 S3 存储桶以确保数据已上传

挑战和故障排除

  • 配置挑战:确保docker-compose.yaml 正确设置环境变量和配置(如文件中的)可能很棘手。不正确的设置可能会阻止服务启动或通信。
  • 服务依赖性:像 Kafka 或 Airflow 这样的服务依赖于其他服务(例如,Kafka 的 Zookeeper)。确保服务初始化的正确顺序至关重要。
  • Airflow DAG 错误:DAG 文件 ( kafka_stream_dag.py) 中的语法或逻辑错误可能会阻止 Airflow 正确识别或执行 DAG。
  • 数据转换问题:Python 脚本中的数据转换逻辑可能并不总是产生预期的结果,特别是在处理来自随机名称 API 的各种数据输入时。
  • Spark 依赖项:确保所有必需的 JAR 可用且兼容对于 Spark 的流作业至关重要。JAR 丢失或不兼容可能会导致作业失败。
  • Kafka 主题管理:使用正确的配置(如复制因子)创建主题对于数据持久性和容错能力至关重要。
  • 网络挑战:在 docker-compose.yaml 中设置的 Docker 网络必须正确地促进服务之间的通信,特别是对于 Kafka 代理和 Zookeeper。
  • S3 存储桶权限:写入 S3 时确保正确的权限至关重要。权限配置错误可能会阻止 Spark 将数据保存到存储桶。
  • 弃用警告:提供的日志显示弃用警告,表明所使用的某些方法或配置在未来版本中可能会过时。

结论:

在整个旅程中,我们深入研究了现实世界数据工程的复杂性,从原始的未经处理的数据发展到可操作的见解。从收集随机用户数据开始,我们利用 Kafka、Spark 和 Airflow 的功能来管理、处理和自动化这些数据的流式传输。Docker 简化了部署,确保了环境的一致性,而 S3 和 Python 等其他工具发挥了关键作用。

这项努力不仅仅是建造一条管道,而是理解工具之间的协同作用。我鼓励大家进一步尝试、调整和增强此流程,以满足独特的需求并发现更深刻的见解。潜心、探索、创新!

原文作者:Simardeep Singh

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-11-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据杂货铺 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 入门:先决条件和设置
  • 分解项目文件
  • 1、docker-compose.yml
  • 2、kafka_stream_dag.py
  • 3、kafka_streaming_service.py
  • 4、spark_processing.py
  • 构建数据管道:逐步
  • 1. 设置Kafka集群
  • 3. 配置 Airflow 用户
  • 4. 访问 Airflow Bash 并安装依赖项
  • 5. 验证 DAG
  • 6. 启动 Airflow 调度程序
  • 7. 验证数据是否上传到 Kafka 集群
  • 8. 传输 Spark 脚本
  • 9.启动 Spark Master 并下载 JAR
  • 挑战和故障排除
  • 结论:
相关产品与服务
容器镜像服务
容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档