Apache Beam是一个用于处理大规模数据流的开源框架,Google Cloud Dataflow则是基于Apache Beam的托管服务。使用Python可以轻松地创建自定义模板来处理数据流。(apache bean)(apache bean)

访客 223 0

dataflow

阿帕奇光束 (Apache Beam)

Apache Beam(批处理+流处理)是一个统一的编程模型,用于定义和执行批处理和流式数据处理作业。它提供了用于运行数据管道的SDK,并提供执行器来执行这些作业。

Apache Beam是一个统一的编程模型,用于定义和执行批处理和流数据处理作业。它提供了一个SDK,用于运行数据管道和程序。

Apache Beam can add significant value in scenarios that encompass the transfer of data across various storage layers, the manipulation of data, and the execution of real-time data processing tasks.

Apache Beam在涉及数据移动、数据转换和实时数据处理作业的用例中,能够提供价值,并且适用于不同存储层。

In Apache Beam, three core concepts prevail, namely:

Apache Beam包含三个基本概念,分别是:

  • Pipeline — encompasses all data processing tasks and embodies a directed acyclic graph (DAG) of PCollection and PTransform. It bears resemblance to the Spark Context.

    管道是一个封装了整个数据处理任务的工具,它表示了PCollection和PTransform的有向无环图(DAG)。可以将其类比为Spark Context。

  • PCollection — represents a dataset that can either be a fixed batch or a continuous stream of data. It can be considered as an equivalent to Spark RDD.

    PCollection 是一个数据集合,可以是固定的批处理数据或者流式数据。我们可以将其类比为 Spark 的 RDD。

  • PTransform是一种数据处理操作,它接收一个或多个PCollection,并输出零个或多个PCollection。可以将其视为对RDD进行Spark转换/操作以输出结果。

    PTransform是一种数据处理操作,它接收一个或多个PCollection,并输出零个或多个PCollection。可以将其类比为Spark上的RDD转换/操作,用于生成结果。

Apache Beam is purposefully designed to ensure that pipelines can be seamlessly transferred between various runners. In the following illustration, the pipeline is executed locally using the DirectRunner, an excellent choice for development, testing, and debugging purposes.

Apache Beam的目标是实现管道的可移植性,使其能够在不同的运行程序之间进行迁移。下面的示例展示了如何使用DirectRunner在本地执行管道,这对于开发、测试和调试非常方便。

WordCount Example(“Bigdata Hello World”):

示例:WordCount(“Bigdata Hello World”)

```python
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | 'Creating PCollection' >> beam.Create(['Hello', 'Hello Good Morning', 'GoodBye'])
    counts = (
        lines
        | ...
```

请提供缺失的代码部分,以便我可以完整地改写这段话。      

让我们简单回顾一下代码的功能。

The code beam.Pipeline(options=PipelineOptions()) creates a beam pipeline by accepting the provided configuration options.

使用`beam.Pipeline`函数来创建光束管道,该函数接受一个配置选项作为参数。

beam.Create generates a PCollection by utilizing the provided data.

使用beam.Create从数据源创建一个PCollection。

The beam.FlatMap operation is applied to every element of the PCollection, breaking down each line into individual words.

beam.FlatMap函数适用于对每个元素进行处理,并将每行拆分为单词。

beam.Map- a transformation operation that associates each word with the value (word,1)

beam.Map将每个单词映射为(word, 1)的转换操作。

The beam.CombinePerKey operation is similar to the groupbykey operation, as it combines and calculates the total count for each word.

beam.CombinePerKey类似于groupbykey操作,它将每个单词的计数求和。

beam.ParDo- another transformation operation which is applied on key-value pair and prints the final result.

beam.ParDo是一种用于键-值对的转换操作,它能够展示最终结果。

Google Cloud Dataflow (Google Cloud Dataflow)

Cloud Dataflow is a comprehensive service provided by Google Cloud Platform, which allows for the seamless execution of Apache Beam pipelines. With Cloud Dataflow, data processing jobs can be effortlessly carried out. Specifically designed to handle massive datasets, Dataflow efficiently distributes these processing tasks across multiple virtual machines in the cluster to ensure smooth and efficient data processing.

Cloud Dataflow是一项完全托管的服务,专为在Google Cloud Platform上运行Apache Beam管道而设计。它能够高效执行数据处理作业,并且能够处理非常大的数据集。通过将任务分配给集群中的多个虚拟机,Dataflow可以实现并行处理不同的数据块。尽管在Google Cloud Platform上有其他大数据处理引擎可供选择,例如在Google Cloud Dataproc Service中运行Apache Spark等替代方法,但Dataflow仍然具有其独特之处和优势。那么为什么要选择Dataflow呢?以下是几个原因:

无服务器计算:我们无需管理计算资源。它会自动在运行处理作业时启动和关闭一组虚拟机群集。我们只需专注于构建代码,而不是构建群集。然而,Apache Spark即使在Cloud Dataproc上运行,仍需要更多的配置工作。”

无服务器:我们无需管理计算资源,当运行处理作业时,它会自动旋转虚拟机集群。这样我们可以专注于构建代码而不是构建集群。相比之下,即使在Cloud Dataproc上运行Apache Spark也需要更多的配置。

Processing code is distinct from the execution environment. In 2016, Google contributed an open-source Dataflow SDK and a collection of data connectors for accessing the Google Cloud Platform, enhancing the capabilities of the Apache Beam project. With this, we have the flexibility to write beam programs and execute them either on our local system or through the Cloud Dataflow service. The Dataflow documentation directs us to visit the Apache Beam website for obtaining the most up-to-date version of their Software Development Kit.

在2016年,Google向Apache Beam项目捐赠了开源的Dataflow SDK和一系列数据连接器,以便访问Google Cloud Platform并为该项目增加其他功能。这样一来,我们可以编写Beam程序,并在本地系统或Cloud Dataflow服务上运行它们。当我们查看Dataflow文档时,建议从Apache Beam网站获取最新版本的软件开发套件来使用。

“通过同一编程模型处理批处理和流式模式:其他大数据SDK根据数据是批处理还是流式形式需要不同的代码。而Apache Beam通过统一的编程模型解决了这个问题。竞争对手如Spark也在考虑这个问题,但他们还没有完全实现。”

处理批处理和流模式时,其他大数据SDK需要编写不同的代码来适应不同的数据形式,即批处理或流处理。然而,Apache Beam采用了统一的编程模型来解决这个问题。尽管像Spark这样的竞争对手也在考虑类似的做法,但他们目前还没有实现这一点。

使用Python创建自定义模板 (Creating a Custom template using Python)

The main objective of the templates is to encapsulate the dataflow pipelines as reusable components, allowing for easy customization by modifying only the necessary pipeline parameters. These template files are stored in a GCS bucket and can be executed from various sources such as the console, gcloud command, or other services like Cloud Scheduler/Functions, etc.

模板的主要目标是将数据流管道以可重用组件的形式打包,只需更改所需的管道参数。模板文件存储在GCS存储桶中,并可以通过控制台、gcloud命令或其他服务(如Cloud Scheduler / Functions等)启动。

Allow us to delve into an illustration of how to transfer data from Google Cloud Storage to BigQuery using the Cloud Dataflow Python SDK. Subsequently, we will demonstrate the creation of a personalized template that can accept a runtime parameter.

让我们来探索一个案例,这个案例使用Cloud Dataflow Python SDK将数据从Google Cloud Storage传输到Bigquery,并且创建一个能够接受运行时参数的自定义模板。

Google已经为我们提供了许多使用Python SDK运行Dataflow作业的示例,这里可以找到很多示例:https://github.com/GoogleCloudPlatform/DataflowTemplates。

Google已经为我们提供了许多使用python SDK运行数据流作业的示例,并且可以在这里找到许多示例https://github.com/GoogleCloudPlatform/DataflowTemplates

I am currently utilizing PyCharm with Python 3.7, and I have successfully installed all the necessary packages to execute Apache Beam (2.22.0) locally.

我正在使用Python 3.7与PyCharm,并已成功安装了所有必要的程序包,以便在本地运行Apache Beam(2.22.0)。

A CSV file has been uploaded to the GCS bucket. This file serves as a sample of the publicly available dataset called USA Names which is hosted on Bigquery. It encompasses all names derived from Social Security card applications for births that took place in the United States after 1879.

CSV文件已成功上传至GCS存储桶。该文件是托管在BigQuery上的公共数据集“USA Names”的样本,包含了自1879年以来在美国出生并申请社会保险卡的所有人的姓名。

Apache Beam是一个用于处理大规模数据流的开源框架,Google Cloud Dataflow则是基于Apache Beam的托管服务。使用Python可以轻松地创建自定义模板来处理数据流。(apache bean)(apache bean)-第1张图片-谷歌商店上架

在PyCharm中,可以通过设置环境变量GOOGLE_APPLICATION_CREDENTIAL来对应用程序代码进行身份验证,具体设置步骤如下。

为了对应用程序代码进行身份验证,需要在PyCharm中设置环境变量GOOGLE_APPLICATION_CREDENTIAL。可以按照以下步骤设置该环境变量。

Apache Beam是一个用于处理大规模数据流的开源框架,Google Cloud Dataflow则是基于Apache Beam的托管服务。使用Python可以轻松地创建自定义模板来处理数据流。(apache bean)(apache bean)-第2张图片-谷歌商店上架 The downloaded key file path should be set up as an env variable in PyCharm. 下载的密钥文件路径应在PyCharm中设置为env变量。

The admin IAM role was assigned to the service account. If you prefer not to grant an owner role to your service account, please consult the access controls guide for dataflow on this page: (https://cloud.google.com/dataflow/docs/concepts/access-control).

服务帐户已被授予管理员IAM角色。如果您不想让服务帐户拥有所有者角色,请查阅数据流访问控制指南中的此页面(https://cloud.google.com/dataflow/docs/concepts/access-control)。

以下代码用于将数据从GCS存储桶传输到BigQuery,可以作为模板运行,也可以在本地环境中通过删除模板位置选项来运行。它使用beam.io.ReadFromText读取GCS位置中的文件,将元素映射为BigQuery行,并使用beam.io.BigquerySink将其写入BigQuery。

下面的代码可以用来将数据从GCS存储桶传输到Bigquery,并且可以作为模板或在本地环境中运行,通过删除模板位置选项。它使用beam.io.ReadFromText在GCS位置读取文件,并将其映射为Bigquery行,然后使用beam.io.BigquerySink将其写入Bigquery。

p = beam.Pipeline(options=pipeline_options_val)
data_ingestion = dataingestion()(p | 'Read from a File' >> beam.io.ReadFromText(pipeline_options['input'], skip_header_lines=1)
 | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                pipeline_options['output'],
                schema='state:STRING,gender:STRING,year:STRING,name:STRING,number:STRING,created_dat'))      

现在,我们将利用Bigquery数据创建自定义模板,以计算样本数据中的男性和女性人数。我们将通过传递参数F和M来运行数据流。数据流模板使用运行时参数来接受仅在管道执行期间可用的值。为了定制化管道的执行过程,我们可以将这些参数传递给在管道内运行的函数(例如DoFn)。

我们正在使用beam.io.Read(beam.io.BigquerySource())从Bigquery中选择性别列。使用Beam.ParDo来根据在运行时传递的值对元素进行过滤,该值是通过dataflow添加参数选项传递的。然后,剩余的PCollection使用beam.combiners.Count.Globally()进行全局合并以获取计数。

我们正在使用beam.io.Read(beam.io.BigquerySource())从Bigquery中选择性别列。通过Beam.ParDo,我们可以在运行时使用dataflow add parameters选项传递的值来过滤元素。然后,我们使用beam.combiners.Count.Globally()全局合并其余的PCollection以获取计数。

A template file is generated at the specified GCS location, which was provided as a command-line parameter.

已在GCS位置成功创建了一个模板文件,该位置已通过命令行参数传递。

It is now time to initiate the pipeline.

是时候启动管道了。

在数据流界面上点击从模板创建作业,输入所需的详细信息,并按照下方图像中显示的方式传递运行时参数。我忽略了下方要求创建元数据文件以验证参数的警告。可以在与模板相同的文件夹中使用名称为<template_name>_metadata来创建元数据文件。

在数据流用户界面中,点击从模板创建作业,然后输入所需的详细信息,并传递运行时参数,如下图所示。我忽略了以下警告,该警告要求创建一个用于验证参数的元数据文件。您可以在与名为<template_name> _metadata的模板相同的文件夹中创建元数据文件。

Apache Beam是一个用于处理大规模数据流的开源框架,Google Cloud Dataflow则是基于Apache Beam的托管服务。使用Python可以轻松地创建自定义模板来处理数据流。(apache bean)(apache bean)-第3张图片-谷歌商店上架 passing the runtime parameter 传递运行时参数

We can also run the dataflow job using the gcloud command.

我们还可以通过使用gcloud命令来执行数据流作业。

运行以下命令以启动gcloud dataflow作业:gcloud dataflow jobs run <job-name> --gcs-location <gsc-template-location> --region <region> --staging-location <temp-gcs-location>,并使用参数filter_val=<filter-value>来设置过滤值。      

通过结合Apache Beam和Cloud Dataflow,我们能够将注意力集中在管道的逻辑组成上,而不是并行处理的物理编排。这种组合提供了有用的抽象层,使我们能够与分布式处理的低级细节隔离开来,并为企业提供了提高生产力的独特机会。

Today's update concludes here. For the complete code, please visit my Github repository at: https://github.com/ankitakundra/GoogleCloudDataflow.

今天就这些。您可以在我的Github上查看完整的代码:https://github.com/ankitakundra/GoogleCloudDataflow

感谢阅读。请随时通过评论区或者LinkedIn(https://www.linkedin.com/in/ankita-kundra-77024899/)与我联系。我乐于参与任何讨论,欢迎提供建设性的批评意见。谢谢!

感谢您的阅读!如果您有任何问题或想与我交流,欢迎在评论部分留言或通过我的LinkedIn页面(https://www.linkedin.com/in/ankita-kundra-77024899/)与我联系。我非常欢迎任何讨论和建设性的批评。再次感谢您!

来源:https://medium.com/swlh/apache-beam-google-cloud-dataflow-and-creating-custom-templates-using-python-c666c151b4bc 改写为: 引自Medium文章《Apache Beam、Google Cloud Dataflow和使用Python创建自定义模板》

dataflow

标签: 谷歌商店上架 数据流 数据

发表评论 (已有0条评论)

还木有评论哦,快来抢沙发吧~