0

如果标题过于模糊,我很抱歉,但我无法正确说出它。在Apache Spark中加入流式数据

所以基本上我想弄清楚Apache Spark和Apache Kafka是否能够将数据从我的关系数据库同步到Elasticsearch。

我的计划是使用Kafka连接器之一从RDBMS中读取数据并将其推入到Kafka主题中。那将是模型和DDL的ERD。很基本的,ReportProduct表有许多到许多存在于ReportProduct表关系: ERD

CREATE TABLE dbo.Report (
    ReportID INT NOT NULL PRIMARY KEY, 
    Title NVARCHAR(500) NOT NULL, 
    PublishedOn DATETIME2 NOT NULL); 

CREATE TABLE dbo.Product (
    ProductID INT NOT NULL PRIMARY KEY, 
    ProductName NVARCHAR(100) NOT NULL); 

CREATE TABLE dbo.ReportProduct (
    ReportID INT NOT NULL, 
    ProductID INT NOT NULL, 
    PRIMARY KEY (ReportID, ProductID), 
    FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID), 
    FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID)); 

INSERT INTO dbo.Report (ReportID, Title, PublishedOn) 
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28'); 

INSERT INTO dbo.Product (ProductID, ProductName) 
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product'); 

INSERT INTO dbo.ReportProduct (ReportID, ProductID) 
VALUES (1, 1), (1, 2), (1, 3), (1, 4); 

SELECT * 
FROM dbo.Report AS R 
INNER JOIN dbo.ReportProduct AS RP 
    ON RP.ReportID = R.ReportID 
INNER JOIN dbo.Product AS P 
    ON P.ProductID = RP.ProductID; 

我的目标是用以下结构转变成文档这样的:

{ 
    "ReportID":1, 
    "Title":"Yet Another Apache Spark StackOverflow question", 
    "PublishedOn":"2017-09-12T19:15:28+00:00", 
    "Product":[ 
    { 
     "ProductID":1, 
     "ProductName":"Apache" 
    }, 
    { 
     "ProductID":2, 
     "ProductName":"Spark" 
    }, 
    { 
     "ProductID":3, 
     "ProductName":"StackOverflow" 
    }, 
    { 
     "ProductID":4, 
     "ProductName":"Random product" 
    } 
    ] 
} 

report.join(
    report_product.join(product, "product_id") 
    .groupBy("report_id") 
    .agg(
     collect_list(struct("product_id", "product_name")).alias("product") 
    ), "report_id").show 
:我使用的静态数据,我已在本地嘲笑了能够形成这样一种结构

但是我意识到这太基本了,流会变得更加复杂。

数据被不规则地改变(主要在每周),报告和他们的产品正在不断地变化,产品在一段时间换一次。

我想复制到这些表中发生的Elasticsearch的任何类型的变化。

回答

1
  1. 卡夫卡连接到从源数据库提取数据 - 可以使用JDBC Source它可为Confluent Platform(或separately)部分,也可以调查kafka-connect-cdc-mssql

  2. 一旦你”我们已经获得了卡夫卡的数据,使用Kafka Streams API来根据需要操纵数据,或者查看最新发布的KSQL。你的选择会之类的东西你偏爱在类似SQL的环境(KSQL)在Java编码(卡夫卡流)或操纵数据来驱动。无论如何,这两个的输出将成为另一个卡夫卡话题。

  3. 最后,使用Elasticsearch卡夫卡连接插件(可here,或为Confluent Platform的一部分)

+0

这听起来真不错流卡夫卡话题从上方插入Elasticsearch。从我之前所做的研究,卡夫卡不会让你在这可能是我的情况不分区键加入。 KSQL解决了这个问题吗? –

+0

您可以使用KSQL轻松重新分区,我认为这将解决此问题。我还没有尝试过。 –