我几个月来一直在玩Netflix的RxJava。反应式编程改变了我的整个编程方法。它确实带来了最好的功能编程所提供的。SQLite不喜欢反应式编程?
但是,我想使用SQLite的反应式编程。 David Moten编写了a great library来将RxJava集成到JDBC中。但SQLite似乎有一个问题。它不喜欢一个查询推送一个ResultSet
,其中每个记录迭代被转换为一个对象并驱动另一个查询。
说我有两个表
CREATE TABLE TABLE_ONE (
ID INTEGER PRIMARY KEY
NOT NULL,
VALUE INTEGER NOT NULL
);
CREATE TABLE TABLE_TWO (
ID INTEGER NOT NULL
PRIMARY KEY,
FOREIGN_ID INTEGER NOT NULL
REFERENCES TABLE_ONE ([KEY]),
VALUE INTEGER NOT NULL
);
我创建了一个单子,做一些INSERT父/母SELECT/INSERT子/ SELECT子样的操作。
import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;
import java.sql.Connection;
public final class Test {
public static void main(String[] args) {
Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get();
Database db = Database.from(con);
Observable<Integer> inputs = Observable.just(100,200,300);
db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)")
.parameters(inputs)
.returnGeneratedKeys()
.getAs(Integer.class)
.flatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
.parameter(k)
.get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
).flatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
.parameter(t1.id)
.parameter(t1.value)
.returnGeneratedKeys()
.getAs(Integer.class)
).flatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
.parameter(k)
.get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))
).subscribe(System.out::println, Throwable::printStackTrace);
db.close();
}
private static final class Type1 {
private final int id;
private final int value;
private Type1(int id, int value) {
this.id = id;
this.value = value;
}
}
private static final class Type2 {
private final int id;
private final int foreignId;
private final int value;
private Type2(int id, int foreignKey, int value) {
this.id = id;
this.foreignId = foreignKey;
this.value = value;
}
@Override
public String toString() {
return "Type2{" +
"id=" + id +
", foreignId=" + foreignId +
", value=" + value +
'}';
}
}
}
更具体地,这是这种情况发生于所有三个数字(100,200,300)的处理...
1) INSERT a TABLE_ONE record, get its primary key ID
2) SELECT that TABLE_ONE record with ID
3) Turn it into a Type1 Object
4) INSERT a TABLE_TWO record with Type1's `id for the foreign key (and get primary key ID)
5) SELECT TABLE_TWO record with ID
6) Turn it into a Type2 Object
这一切发生原子级对于每100,200,300倍的值和这个链中每个人都会发生4次更新/查询。
不过,我得到一个SQLITE_INTERRUPT
错误
java.sql.SQLException: [SQLITE_INTERRUPT] Operation terminated by sqlite3_interrupt() (interrupted)
at org.sqlite.core.DB.newSQLException(DB.java:890)
at org.sqlite.core.DB.newSQLException(DB.java:901)
at org.sqlite.core.DB.throwex(DB.java:868)
at org.sqlite.jdbc3.JDBC3ResultSet.next(JDBC3ResultSet.java:93)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:112)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestSome(QuerySelectProducer.java:75)
但第一项通过链条推动成功,插入到两个表,并打印虽然我有两个值(200和300)去。
Type2{id=1, foreignId=1, value=100}
我的理论是,由于每个发射项目O
从一个查询推到下一个,它将中断和取消以前的查询的迭代如图X
QUERY OP 4----------------------O-
QUERY OP 3----------------O-----X-
QUERY OP 2------------O---X-------
QUERY OP 1-------O----X-----------
因此发出的第一项目会经过,但它会在其后面留下一些中断的查询来驱动当前项目,因此无法让下一个项目成为onNext()
'd,因为查询迭代被终止。
SQLite和RxJava的人,你们中的任何一个人都可以想办法解决这个问题吗?是否有可以配置的SQLite设置来阻止这种中断?还是有一个RxJava构图技巧可以做到防止中断?
我还用上面的测试创建了一个简单的Git回购。 https://github.com/thomasnield/rxjava_sqlite_test
是的,如果我用RxJava的'toList()','最后()',或在进行下一阶段之前收集的所有记录一些其他的拦截运营商,那是我在讨论解决方法GitHub问题。 https://github.com/davidmoten/rxjava-jdbc/issues/45#issuecomment-150103664问题是这是反应世界中的一种反模式,并且令人不悦。我可以做这个解决方法,但希望有一个非阻塞的方式来实现这一点。 – tmn
这就是我不喜欢它的原因。但修复它需要深入改变SQLite的工作方式。它需要能够为每个查询保留数据库的副本,以便SELECT可以在COMMIT取代它时继续遍历它。这几乎等同于MVCC /快照隔离。 –
那么这并不理想。我想,使用超级轻量级技术总是有缺点,因为我们希望他们在没有任何开销的情况下完成所有任务。我将发布我的解决方案。 – tmn