数据共享-spring batch(9)上下文处理 还在手工生成数据库文档?3个步骤自动完成了解一下 python处理Excel文件 python基本操作-文件、目录及路径 MinIO 的分布式部署 利用 MinIO 轻松搭建静态资源服务 搞定SpringBoot多数据源(3):参数化变更源 搞定SpringBoot多数据源(2):动态数据源 搞定SpringBoot多数据源(1):多套源策略 java开发必学知识:动态代理 springboot+apache前后端分离部署https springboot+logback日志输出企业实践(下) springboot+logback日志输出企业实践(上) springboot+swagger接口文档企业实践(下) springboot+swagger接口文档企业实践(上) 查阅了十几篇学习资源后,我总结了这份AI学习路径 java应用监测(8)-阿里诊断工具arthas java应用监测(7)-在线动态诊断神器BTrace java应用监测(6)-第三方内存分析工具MAT java应用监测(5)-可视化监测工具 java应用监测(4)-线上问题排查套路 java应用监测(3)-这些命令行工具你掌握了吗 java应用监测(2)-java命令的秘密 java应用监测(1)-java程序员应该知道的应用监测技术 mongo同步-spring batch(8)的mongo读写组件使用 使用docsify构建专业文档网站(下) 使用docsify构建专业文档网站(上) 调度与监控-spring batch(7)结合xxl-job进行批处理 增量同步-spring batch(6)动态参数绑定与增量同步 便捷的数据读写-spring batch(5)结合beetlSql进行数据读写 决战数据库-spring batch(4)数据库到数据库 快速使用组件-spring batch(3)读文件数据到数据库 快速了解组件-spring batch(2)之helloworld 数据批处理神器-Spring Batch(1)简介及使用场景 部署工具(deploy-tool)开源 java服务安装(三):使用appassembler java服务安装(二):使用commons-daemon java服务安装(一):使用java service wrapper及maven打zip包

mongo同步-spring batch(8)的mongo读写组件使用

2019年08月09日

tags: springbatch mongodb


1.引言

之前对Spring Batch的通过实例的方式进行了介绍,有兴趣的可见以下文章:

除了文件及关系型数据库的数据同步,Spring Batch的读组件(ItemReader),处理组件(ItemProcessor),写组件(ItemWriter)支持丰富的数据类型,其中MongoItemReaderMongoItemWriter是针对mongo的读写组件,用户可以直接使用,进行Mongodb的数据读写操作。一种比较常用的情景是从关系型数据库(如mysql)把数据同步到mongodb中,下面通过实例对mysqlmongodb的数据同步进行讲解。本文主要讲解有关Mongodb的操作,对于Spring Batch使用beetlsql进行关系数据库数据读取的操作请见文章《便捷的数据读写-spring batch(5)结合beetlSql进行数据读写》。本文的示例代码见github示例仓库

2.开发环境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 开发IDE: IDEA
  • 构建工具Maven: 3.3.9
  • 日志组件logback:1.2.3
  • lombok:1.18.6
  • MySQL: 5.6.26
  • Mongodb:4.0.10

3.开发流程

3.1 示例数据库及目标数据库

本示例的流程如下所示:

流程

示例工程中的sql目录有相应的关系数据库脚本,mytest.sql脚本创建一个test_user表,并有相应的测试数据。mongodb的安装可见官方文档,建立相应的存放数据的Collection,本示例为mytest

3.2 添加maven依赖及配置mongodb连接地址

由于需要使用mongodb的操作,因此需要添加它的依赖。如下所示:

<!-- mongodb -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>

添加依赖后,mongodb的连接地址需配置在配置文件中,若有用户名密码,则同样需要配置。如下:

spring.data.mongodb.uri=mongodb://192.168.222.10/mytest
# spring.data.mongodb.username=
# spring.data.mongodb.password=

3.3 编写mongodb的读写组件

按示例,共三个组件,需要的是一个读mysql数据库的组件,一个mysql数据库实体转化为mongodb的处理组件,一个写入mongodb的写组件,代码结构如下图所示:

其中ItemReader组件和ItemProcessor组件无须多讲,可参考之前的文章,这里主要讲一下mongodbItemWriter,此写入组件通过继承MongoItemWriter,编写自己的逻辑即可,而Spring Batch提供的mongodb写操作,是在初始化ItemWriter时,通过MongoOperations引入的,因此,MongoBatchConfig文件中,添加以下代码:

@Bean
public ItemWriter mongoWriter(MongoOperations mongoTemplate) {
    UserItemWriter userItemWriter = new UserItemWriter();
    userItemWriter.setTemplate(mongoTemplate);
    userItemWriter.setCollection("user");
    return userItemWriter;
}

其中,MongoOperations是在初始化时注入,在自定义的UserItemWriter中,设置templatecollection即可。若逻辑简单,不写自定义的ItemWriter,也可以直接使用MongoItemWriterBuilder ,直接构建MongoItemWriter,如下所示:

return new MongoItemWriterBuilder<MongoUser>()
                .collection("user")
                .template(mongoTemplate)
                .build();

以上是写组件的构建,同理,对于mongodb的读组件,构建方式类似,只是需要注意一下动态参数的配置,如下示例代码是查询数据,并返回map,参数是在构建任务时动态传入的。

@Bean
@StepScope
public MongoItemReader<Map> tweetsItemReader(MongoOperations mongoTemplate,@Value("#{jobParameters['hashTag']}") String hashtag) {
return new MongoItemReaderBuilder<Map>()
    .name("tweetsItemReader")
    .targetType(Map.class)
    .jsonQuery("{ \"entities.hashtags.text\": { $eq: ?0 }}")
    .collection("tweets_collection")
    .parameterValues(Collections.singletonList(hashtag))
    .pageSize(10)
    .sorts(Collections.singletonMap("created_at", Sort.Direction.ASC))
    .template(mongoTemplate)
    .build();
}

4.执行结果

编写单元测试或者在Controller编写启动任务,即可进行数据同步测试,执行结果如下所示:

5.总结

本文基于Spring Batch对数据从mysqlmongodb进行数据同步,通过结合示例代码,实现mongodb的读写组件进行编写及配置,希望需要使用Spring Batch进行关系数据库和mongodb进行批处理任务开发的人员有帮助。