如果标题过于模糊,我很抱歉,但我无法正确说出它。在Apache Spark中加入流式数据
所以基本上我想弄清楚Apache Spark和Apache Kafka是否能够将数据从我的关系数据库同步到Elasticsearch。
我的计划是使用Kafka连接器之一从RDBMS中读取数据并将其推入到Kafka主题中。那将是模型和DDL的ERD。很基本的,Report
和Product
表有许多到许多存在于ReportProduct
表关系:
CREATE TABLE dbo.Report (
ReportID INT NOT NULL PRIMARY KEY,
Title NVARCHAR(500) NOT NULL,
PublishedOn DATETIME2 NOT NULL);
CREATE TABLE dbo.Product (
ProductID INT NOT NULL PRIMARY KEY,
ProductName NVARCHAR(100) NOT NULL);
CREATE TABLE dbo.ReportProduct (
ReportID INT NOT NULL,
ProductID INT NOT NULL,
PRIMARY KEY (ReportID, ProductID),
FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID),
FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID));
INSERT INTO dbo.Report (ReportID, Title, PublishedOn)
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28');
INSERT INTO dbo.Product (ProductID, ProductName)
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product');
INSERT INTO dbo.ReportProduct (ReportID, ProductID)
VALUES (1, 1), (1, 2), (1, 3), (1, 4);
SELECT *
FROM dbo.Report AS R
INNER JOIN dbo.ReportProduct AS RP
ON RP.ReportID = R.ReportID
INNER JOIN dbo.Product AS P
ON P.ProductID = RP.ProductID;
我的目标是用以下结构转变成文档这样的:
{
"ReportID":1,
"Title":"Yet Another Apache Spark StackOverflow question",
"PublishedOn":"2017-09-12T19:15:28+00:00",
"Product":[
{
"ProductID":1,
"ProductName":"Apache"
},
{
"ProductID":2,
"ProductName":"Spark"
},
{
"ProductID":3,
"ProductName":"StackOverflow"
},
{
"ProductID":4,
"ProductName":"Random product"
}
]
}
report.join(
report_product.join(product, "product_id")
.groupBy("report_id")
.agg(
collect_list(struct("product_id", "product_name")).alias("product")
), "report_id").show
:我使用的静态数据,我已在本地嘲笑了能够形成这样一种结构
但是我意识到这太基本了,流会变得更加复杂。
数据被不规则地改变(主要在每周),报告和他们的产品正在不断地变化,产品在一段时间换一次。
我想复制到这些表中发生的Elasticsearch的任何类型的变化。
这听起来真不错流卡夫卡话题从上方插入Elasticsearch。从我之前所做的研究,卡夫卡不会让你在这可能是我的情况不分区键加入。 KSQL解决了这个问题吗? –
您可以使用KSQL轻松重新分区,我认为这将解决此问题。我还没有尝试过。 –