2015-10-22 30 views
0

我几个月来一直在玩Netflix的RxJava。反应式编程改变了我的整个编程方法。它确实带来了最好的功能编程所提供的。SQLite不喜欢反应式编程?

但是,我想使用SQLite的反应式编程。 David Moten编写了a great library来将RxJava集成到JDBC中。但SQLite似乎有一个问题。它不喜欢一个查询推送一个ResultSet,其中每个记录迭代被转换为一个对象并驱动另一个查询。

说我有两个表

CREATE TABLE TABLE_ONE (
    ID INTEGER PRIMARY KEY 
        NOT NULL, 
    VALUE INTEGER NOT NULL 
); 

CREATE TABLE TABLE_TWO (
    ID   INTEGER NOT NULL 
         PRIMARY KEY, 
    FOREIGN_ID INTEGER NOT NULL 
         REFERENCES TABLE_ONE ([KEY]), 
    VALUE  INTEGER NOT NULL 
); 

我创建了一个单子,做一些INSERT父/母SELECT/INSERT子/ SELECT子样的操作。

import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl; 
import com.github.davidmoten.rx.jdbc.Database; 
import rx.Observable; 

import java.sql.Connection; 

public final class Test { 

    public static void main(String[] args) { 

     Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get(); 
     Database db = Database.from(con); 

     Observable<Integer> inputs = Observable.just(100,200,300); 

     db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)") 
       .parameters(inputs) 
       .returnGeneratedKeys() 
       .getAs(Integer.class) 
       .flatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?") 
           .parameter(k) 
           .get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE"))) 
       ).flatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)") 
           .parameter(t1.id) 
           .parameter(t1.value) 
           .returnGeneratedKeys() 
           .getAs(Integer.class) 
       ).flatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?") 
           .parameter(k) 
           .get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE"))) 
       ).subscribe(System.out::println, Throwable::printStackTrace); 

     db.close(); 
    } 
    private static final class Type1 { 
     private final int id; 
     private final int value; 
     private Type1(int id, int value) { 
      this.id = id; 
      this.value = value; 
     } 
    } 
    private static final class Type2 { 
     private final int id; 
     private final int foreignId; 
     private final int value; 
     private Type2(int id, int foreignKey, int value) { 
      this.id = id; 
      this.foreignId = foreignKey; 
      this.value = value; 
     } 
     @Override 
     public String toString() { 
      return "Type2{" + 
        "id=" + id + 
        ", foreignId=" + foreignId + 
        ", value=" + value + 
        '}'; 
     } 
    } 
} 

更具体地,这是这种情况发生于所有三个数字(100,200,300)的处理...

1) INSERT a TABLE_ONE record, get its primary key ID 
2) SELECT that TABLE_ONE record with ID 
3) Turn it into a Type1 Object 
4) INSERT a TABLE_TWO record with Type1's `id for the foreign key (and get primary key ID) 
5) SELECT TABLE_TWO record with ID 
6) Turn it into a Type2 Object 

这一切发生原子级对于每100,200,300倍的值和这个链中每个人都会发生4次更新/查询。

不过,我得到一个SQLITE_INTERRUPT错误

java.sql.SQLException: [SQLITE_INTERRUPT] Operation terminated by sqlite3_interrupt() (interrupted) 
    at org.sqlite.core.DB.newSQLException(DB.java:890) 
    at org.sqlite.core.DB.newSQLException(DB.java:901) 
    at org.sqlite.core.DB.throwex(DB.java:868) 
    at org.sqlite.jdbc3.JDBC3ResultSet.next(JDBC3ResultSet.java:93) 
    at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:112) 
    at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestSome(QuerySelectProducer.java:75) 

但第一项通过链条推动成功,插入到两个表,并打印虽然我有两个值(200和300)去。

Type2{id=1, foreignId=1, value=100} 

我的理论是,由于每个发射项目O从一个查询推到下一个,它将中断和取消以前的查询的迭代如图X

QUERY OP 4----------------------O- 
QUERY OP 3----------------O-----X- 
QUERY OP 2------------O---X------- 
QUERY OP 1-------O----X----------- 

因此发出的第一项目会经过,但它会在其后面留下一些中断的查询来驱动当前项目,因此无法让下一个项目成为onNext()'d,因为查询迭代被终止。

SQLite和RxJava的人,你们中的任何一个人都可以想办法解决这个问题吗?是否有可以配置的SQLite设置来阻止这种中断?还是有一个RxJava构图技巧可以做到防止中断?

我还用上面的测试创建了一个简单的Git回购。 https://github.com/thomasnield/rxjava_sqlite_test

回答

1

似乎与使用SQLite的事实有关,您不能拥有一个优秀的游标来说一个SELECT并且同时也是COMMIT,这在默认自动提交中的INSERT和UPDATE之后是隐含的模式。这可以防止简单地通过游标循环遍历SELECT所产生的行并执行INSERT。解决的办法是简单地获取所有结果,然后遍历它们。我不喜欢它。

我不知道该库,但看了一些部分后,我确定它使用游标懒洋洋地获取结果,这是什么导致的问题。你可以通过强制它获取每个SELECT的所有结果来解决这个问题。

+0

是的,如果我用RxJava的'toList()','最后()',或在进行下一阶段之前收集的所有记录一些其他的拦截运营商,那是我在讨论解决方法GitHub问题。 https://github.com/davidmoten/rxjava-jdbc/issues/45#issuecomment-150103664问题是这是反应世界中的一种反模式,并且令人不悦。我可以做这个解决方法,但希望有一个非阻塞的方式来实现这一点。 – tmn

+0

这就是我不喜欢它的原因。但修复它需要深入改变SQLite的工作方式。它需要能够为每个查询保留数据库的副本,以便SELECT可以在COMMIT取代它时继续遍历它。这几乎等同于MVCC /快照隔离。 –

+0

那么这并不理想。我想,使用超级轻量级​​技术总是有缺点,因为我们希望他们在没有任何开销的情况下完成所有任务。我将发布我的解决方案。 – tmn

1

虽然这绝对不是理想,但丹的解释似乎证实了我所担心的。幸运的是有办法解决这个问题。即使用于此目的的toList()等运营商对Rx有些反模式,它也远比不使用Rx好。

这里我用toList().concatMap(Observable::from)来收集所有的排放物,并在每个阶段之间再次将它们弄平。

import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl; 
import com.github.davidmoten.rx.jdbc.Database; 
import rx.Observable; 

import java.sql.Connection; 

public final class Test { 

    public static void main(String[] args) { 

     Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get(); 
     Database db = Database.from(con); 

     Observable<Integer> inputs = Observable.just(100,200,300); 

     db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)") 
       .parameters(inputs) 
       .returnGeneratedKeys() 
       .getAs(Integer.class) 
       .toList().concatMap(Observable::from) 
       .concatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?") 
           .parameter(k) 
           .get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE"))) 
       ) 
       .toList().concatMap(Observable::from) 
       .concatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)") 
           .parameter(t1.id) 
           .parameter(t1.value) 
           .returnGeneratedKeys() 
           .getAs(Integer.class) 
       ).toList().concatMap(Observable::from) 
       .concatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?") 
           .parameter(k) 
           .get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE"))) 
       ).subscribe(System.out::println, Throwable::printStackTrace); 

     db.close(); 
    } 
    private static final class Type1 { 
     private final int id; 
     private final int value; 
     private Type1(int id, int value) { 
      this.id = id; 
      this.value = value; 
     } 
    } 
    private static final class Type2 { 
     private final int id; 
     private final int foreignId; 
     private final int value; 
     private Type2(int id, int foreignKey, int value) { 
      this.id = id; 
      this.foreignId = foreignKey; 
      this.value = value; 
     } 
     @Override 
     public String toString() { 
      return "Type2{" + 
        "id=" + id + 
        ", foreignId=" + foreignId + 
        ", value=" + value + 
        '}'; 
     } 
    } 
}