前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rxjava2_Flowable_Sqlite_Android数据库访问实例

Rxjava2_Flowable_Sqlite_Android数据库访问实例

作者头像
砸漏
发布2020-11-02 22:42:14
6690
发布2020-11-02 22:42:14
举报
文章被收录于专栏:恩蓝脚本

一、使用Rxjava访问数据库的优点:

1.随意的线程控制,数据库操作在一个线程,返回数据处理在ui线程

2.随时订阅和取消订阅,而不必再使用回调函数

3.对读取的数据用rxjava进行过滤,流式处理

4.使用sqlbrite可以原生返回rxjava的格式,同时是响应式数据库框架

(有数据添加和更新时自动调用之前订阅了的读取函数,达到有数据添加自动更新ui的效果,

同时这个特性没有禁止的方法,只能通过取消订阅停止这个功能,对于有的框架这反而是一种累赘)

二、接下来之关注实现过程:

本次实现用rxjava2的Flowable,有被压支持(在不需要被压支持的情况建议使用Observable)

实现一个稳健的的可灵活切换其他数据库的结构,当然是先定义数据库访问接口。然后跟具不同的数据库实现接口的方法

定义接口:(对于update,delete,insert,可以选择void类型,来简化调用代码,但缺少了执行结果判断)

代码语言:javascript
复制
public interface DbSource { 
  //String sql = "insert into table_task (tid,startts) values(tid,startts)"; 
  Flowable<Boolean  insertNewTask(int tid, int startts); 
 
  //String sql = "select * from table_task"; 
  Flowable<List<TaskItem   getAllTask(); 
 
  //String sql = "select * from table_task where endts = 0"; 
  Flowable<Optional<TaskItem   getRunningTask(); 
 
  //String sql = "update table_task set isuploadend=isuploadend where tid=tid"; 
  Flowable<Boolean  markUploadEnd(int tid, boolean isuploadend); 
 
  //String sql = "delete from table_task where tid=tid and endts 0"; 
  Flowable<Boolean  deleteTask(int tid); 
} 

三、用Android原生的Sqlite实现数据库操作

代码语言:javascript
复制
public class SimpleDb implements DbSource { 
private static SimpleDb sqlite; 
private SqliteHelper sqliteHelper; 
private SimpleDb(Context context) { 
this.sqliteHelper = new SqliteHelper(context); 
} 
public static synchronized SimpleDb getInstance(Context context) { 
if (sqlite == null ) 
sqlite = new SimpleDb(context); 
return sqlite; 
} 
Flowable<Boolean  insertNewTask(int tid, int startts) { 
return Flowable.create(new FlowableOnSubscribe<Boolean () { 
@Override 
public void subscribe(FlowableEmitter<Boolean  e) throws Exception { 
//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法 
ContentValues values = new ContentValues(); 
values.put(“tid”, 1); 
values.put(“startts”,13233); 
if(sqliteHelper.getWriteableDatabase().insert(TABLE_NAME, null, values) != -1) 
e.onNext(true); 
else 
e.onNext(false); 
e.onComplete(); 
} 
}, BackpressureStrategy.BUFFER); 
} 
Flowable<List<TaskItem   getAllTask() { 
return Flowable.create(new FlowableOnSubscribe<List<TaskItem  () { 
@Override 
public void subscribe(FlowableEmitter<List<TaskItem   e) throws Exception { 
List<TaskItem  taskList = new ArrayList< (); 
StringBuilder sql = new StringBuilder(100); 
sql.append("select * from "); 
sql.append(SqliteHelper.TABLE_NAME_TASK); 
SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase(); 
Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null); 
if (cursor.moveToFirst()) { 
int count = cursor.getCount(); 
for (int a = 0; a < count; a ++) { 
TaskItem item = new TaskItem(); 
item.setTid(cursor.getInt(1)); 
item.setStartts(cursor.getInt(2)); 
item.setEndts(cursor.getInt(3)); 
taskList.add(item); 
cursor.move(1); 
} 
} 
cursor.close(); 
sqLiteDatabase.close(); 
e.onNext(taskList); 
e.onComplete(); 
} 
}, BackpressureStrategy.BUFFER); 
} 
Flowable<Optional<TaskItem   getRunningTask() { 
return Flowable.create(new FlowableOnSubscribe<Optional<TaskItem  () { 
@Override 
public void subscribe(FlowableEmitter<Optional<TaskItem   e) throws Exception { 
TaskItem item = null; 
StringBuilder sql = new StringBuilder(100); 
sql.append("select * from "); 
sql.append(SqliteHelper.TABLE_NAME_TASK); 
sql.append(" where endts=0 limit 1"); 
SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase(); 
Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null); 
if (cursor.moveToFirst()) { 
int count = cursor.getCount(); 
if (count == 1) { 
item = new TaskItem(); 
item.setId(cursor.getInt(0)); 
item.setTid(cursor.getInt(1)); 
item.setStartts(cursor.getInt(2)); 
item.setEndts(cursor.getInt(3)); 
} 
} 
cursor.close(); 
sqLiteDatabase.close(); 
e.onNext(Optional.fromNullable(item)); //import com.google.common.base.Optional;//安全检查,待会看调用的代码,配合rxjava很好 
e.onComplete(); 
} 
}, BackpressureStrategy.BUFFER); 
} 
Flowable<Boolean  markUploadEnd(int tid, boolean isuploadend) { 
return Flowable.create(new FlowableOnSubscribe<Boolean () { 
@Override 
public void subscribe(FlowableEmitter<Boolean  e) throws Exception { 
//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法 
//数据库操作代码 
e.onNext(false);//返回结果 
e.onComplete();//返回结束 
} 
}, BackpressureStrategy.BUFFER); 
} 
Flowable<Boolean  deleteTask(int tid) { 
return Flowable.create(new FlowableOnSubscribe<Boolean () { 
@Override 
public void subscribe(FlowableEmitter<Boolean  e) throws Exception { 
//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法 
//数据库操作代码 
e.onNext(false);//返回结果 
e.onComplete();//返回结束 
} 
}, BackpressureStrategy.BUFFER); 
} 
} 

四、同一个接口使用sqlbrite的实现方式

代码语言:javascript
复制
public class BriteDb implements DbSource { 
@NonNull 
protected final BriteDatabase mDatabaseHelper; 
@NonNull 
private Function<Cursor, TaskItem  mTaskMapperFunction; 
@NonNull 
private Function<Cursor, PoiItem  mPoiMapperFunction; 
@NonNull 
private Function<Cursor, InterestPoiItem  mInterestPoiMapperFunction; 
// Prevent direct instantiation. 
private BriteDb(@NonNull Context context) { 
DbHelper dbHelper = new DbHelper(context); 
SqlBrite sqlBrite = new SqlBrite.Builder().build(); 
mDatabaseHelper = sqlBrite.wrapDatabaseHelper(dbHelper, Schedulers.io(); 
mTaskMapperFunction = this::getTask; 
mPoiMapperFunction = this::getPoi; 
mInterestPoiMapperFunction = this::getInterestPoi; 
} 
@Nullable 
private static BriteDb INSTANCE; 
public static BriteDb getInstance(@NonNull Context context) { 
if (INSTANCE == null) { 
INSTANCE = new BriteDb(context); 
} 
return INSTANCE; 
} 
@NonNull 
private TaskItem getTask(@NonNull Cursor c) { 
TaskItem item = new TaskItem(); 
item.setId(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ID))); 
item.setTid(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_TID))); 
item.setStartts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS))); 
item.setEndts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS))); 
return item; 
} 
@Override 
public void insertNewTask(int tid, int startts) { 
ContentValues values = new ContentValues(); 
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_TID, tid); 
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS, startts); 
mDatabaseHelper.insert(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, SQLiteDatabase.CONFLICT_REPLACE); 
} 
@Override 
public Flowable<List<TaskItem   getAllTask() { 
String sql = String.format("SELECT * FROM %s", PersistenceContract.TaskEntry.TABLE_NAME_TASK);//TABLE_NAME_TASK表的名字字符串 
return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql) 
.mapToList(mTaskMapperFunction) 
.toFlowable(BackpressureStrategy.BUFFER); 
} 
@Override 
public Flowable<Optional<TaskItem   getRunningTask() { 
String sql = String.format("SELECT * FROM %s WHERE %s = ? limit 1", 
PersistenceContract.TaskEntry.TABLE_NAME_TASK, PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS); 
return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql, "0") 
.mapToOne(cursor -  Optional.fromNullable(mTaskMapperFunction.apply(cursor))) 
.toFlowable(BackpressureStrategy.BUFFER); 
} 
@Override 
public Flowable<Boolean  markUploadEnd(int tid, boolean isuploadend) { 
return Flowable.create(new FlowableOnSubscribe<Boolean () { 
@Override 
public void subscribe(FlowableEmitter<Boolean  e) throws Exception { 
ContentValues values = new ContentValues(); 
if(isuploadend) { 
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 1); 
} else { 
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 0); 
} 
String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID + " = ?"; 
//String[] selectionArgs = {String.valueOf(tid)}; 
String selectionArgs = String.valueOf(tid); 
int res = mDatabaseHelper.update(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, selection, selectionArgs); 
if (res   0) { 
e.onNext(true);//返回结果 
} else { 
e.onNext(false);//返回结果 
} 
e.onComplete();//返回结束 
} 
}, BackpressureStrategy.BUFFER); 
} 
@Override 
public Flowable<Boolean  deleteTask(int tid) { 
return Flowable.create(new FlowableOnSubscribe<Boolean () { 
@Override 
public void subscribe(FlowableEmitter<Boolean  e) throws Exception { 
String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID + " = ? AND "+ 
PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS + "   0"; 
String[] selectionArgs = new String[1]; 
selectionArgs[0] = String.valueOf(tid); 
int res = mDatabaseHelper.delete(PersistenceContract.TaskEntry.TABLE_NAME_TASK, selection, selectionArgs); 
if (res   0) { 
e.onNext(true);//返回结果 
} else { 
e.onNext(false);//返回结果 
} 
e.onComplete();//返回结束 
} 
}, BackpressureStrategy.BUFFER); 
} 
} 

五、数据库调用使用方法

使用了lambda简化了表达式进一步简化代码:

简化方法:在/app/build.gradle里面加入如下内容:(defaultConfig的外面)

代码语言:javascript
复制
compileOptions { 
sourceCompatibility JavaVersion.VERSION_1_8 
targetCompatibility JavaVersion.VERSION_1_8 
} 

接口调用(获得数据库实例):

代码语言:javascript
复制
//全局定义的实例获取类,以后想要换数据库,只需在这个类里切换即可 
public class Injection { 
public static DbSource getDbSource(Context context) { 
//choose one of them 
//return BriteDb.getInstance(context); 
return SimpleDb.getInstance(context); 
} 
} 
DbSource db = Injection.getInstance(mContext); 
disposable1 = db.getAllTask() 
.flatMap(Flowable::fromIterable) 
.filter(task -  {     //自定义过滤 
if (!task.getIsuploadend()) { 
return true; 
} else { 
return false; 
} 
}) 
.subscribe(taskItems -  //这里是使用了lambda简化了表达式 
doTaskProcess(taskItems) 
, throwable -  { 
throwable.printStackTrace(); 
},// onCompleted 
() -  { 
if (disposable1 != null && !disposable1.isDisposed()) { 
disposable1.dispose(); 
} 
}); 
disposable1 = db.getRunningTask() 
.filter(Optional::isPresent) //判断是否为空,为空的就跳过 
.map(Optional::get)    //获取到真的参数 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(taskItem -  {     //onNext() 
//has running task 
mTid = taskItem.getTid(); 
}, throwable -  throwable.printStackTrace() //onError() 
, () -  disposable1.dispose());    //onComplete() 
disposable1 = db.markUploadEnd(tid, isuploadend) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(status -  {     //onNext() 
if (status) { 
//dosomething 
} 
}, throwable -  throwable.printStackTrace() //onError() 
, () -  disposable1.dispose());    //onComplete() 
disposable1 = db.deleteTask(tid) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(status -  {     //onNext() 
if (status) { 
//dosomething 
} 
}, throwable -  throwable.printStackTrace() //onError() 
, () -  disposable1.dispose());    //onComplete() 

以上这篇Rxjava2_Flowable_Sqlite_Android数据库访问实例就是小编分享给大家的全部内容了,希望能给大家一个参考。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-09-11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档