服务器之家:专注于服务器技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|

服务器之家 - 数据库 - Mysql - MySQL 与 Elasticsearch 数据不对称问题解决办法

MySQL 与 Elasticsearch 数据不对称问题解决办法

2020-08-12 16:35MYSQL教程网 Mysql

这篇文章主要介绍了MySQL 与 Elasticsearch 数据不对称问题解决办法的相关资料,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作,这里提供解决办法,需要的朋友可以参考下

MySQLElasticsearch 数据不对称问题解决办法

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field    | Type     | Null | Key | Default            | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id     | int(11)   | NO  |   | 0               |    |
| title    | mediumtext  | NO  |   | NULL              |    |
| description | mediumtext  | YES |   | NULL              |    |
| author   | varchar(100) | YES |   | NULL              |    |
| source   | varchar(100) | YES |   | NULL              |    |
| content   | longtext   | YES |   | NULL              |    |
| status   | enum('Y','N')| NO  |   | 'N'              |    |
| ctime    | timestamp  | NO  |   | CURRENT_TIMESTAMP       |    |
| mtime    | timestamp  | YES |   | ON UPDATE CURRENT_TIMESTAMP  |    |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)

logstash 增加 mtime 的查询规则

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
jdbc {
  jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
  jdbc_user => "cms"
  jdbc_password => "password"
  schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次
  statement => "select * from article where mtime > :sql_last_value"
  use_column_value => true
  tracking_column => "mtime"
  tracking_column_type => "timestamp"
  record_last_run => true
  last_run_metadata_path => "/var/tmp/article-mtime.last"
 }

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

?
1
2
3
4
5
CREATE TABLE `elasticsearch_trash` (
 `id` int(11) NOT NULL,
 `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

为 article 表创建触发器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN
 -- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。
 IF NEW.status = 'N' THEN
 insert into elasticsearch_trash(id) values(OLD.id);
 END IF;
 -- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。
  IF NEW.status = 'Y' THEN
 delete from elasticsearch_trash where id = OLD.id;
 END IF;
END
 
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN
 -- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。
 insert into elasticsearch_trash(id) values(OLD.id);
END

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。

实体

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cn.netkiller.api.domain.elasticsearch;
 
import java.util.Date;
 
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
 
@Entity
@Table
public class ElasticsearchTrash {
 @Id
 private int id;
 
 @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
 private Date ctime;
 
 public int getId() {
 return id;
 }
 
 public void setId(int id) {
 this.id = id;
 }
 
 public Date getCtime() {
 return ctime;
 }
 
 public void setCtime(Date ctime) {
 this.ctime = ctime;
 }
 
}

仓库

?
1
2
3
4
5
6
7
8
9
10
package cn.netkiller.api.repository.elasticsearch;
 
import org.springframework.data.repository.CrudRepository;
 
import com.example.api.domain.elasticsearch.ElasticsearchTrash;
 
public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{
 
 
}

定时任务

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package cn.netkiller.api.schedule;
 
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
 
import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;
 
@Component
public class ScheduledTasks {
 private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
 
 @Autowired
 private TransportClient client;
 
 @Autowired
 private ElasticsearchTrashRepository alasticsearchTrashRepository;
 
 public ScheduledTasks() {
 }
 
 @Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务
 public void cleanTrash() {
 for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
  DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
  RestStatus status = response.status();
  logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());
  if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {
  alasticsearchTrashRepository.delete(elasticsearchTrash);
  }
 }
 }
}

Spring boot 启动主程序。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package cn.netkiller.api;
 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
 
@SpringBootApplication
@EnableScheduling
public class Application {
 
 public static void main(String[] args) {
 SpringApplication.run(Application.class, args);
 }
}

以上就是MySQL 与 Elasticsearch 数据不对称问题解决办法的讲解,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

原文链接:https://my.oschina.net/neochen/blog/1518679

延伸 · 阅读

精彩推荐
  • MysqlMySQL优化insert性能的方法示例

    MySQL优化insert性能的方法示例

    对于一些数据量较大的系统,数据库面临的问题除了查询效率低下,还有就是数据入库时间长。下面这篇文章主要给大家介绍了关于MySQL优化insert性能的相...

    傲雪星枫3782019-07-11
  • Mysqlmysql数据库互为主从配置方法分享

    mysql数据库互为主从配置方法分享

    共有四台机器:A(10.1.10.28),B(10.1.10.29),C(10.1.10.30),D(10.1.10.31)。配置后结果:A-C互为主从,B为A的slave,D为C的slave ...

    MYSQL教程网3172019-12-01
  • Mysql解决MySQL中的Slave延迟问题的基本教程

    解决MySQL中的Slave延迟问题的基本教程

    这篇文章主要介绍了解决MySQL中的Slave延迟问题的基本教程,文中针对不同情况给出了一些具体的解决方法,需要的朋友可以参考下 ...

    叶金荣3652020-05-23
  • Mysqlmysql存储过程详解

    mysql存储过程详解

    我们常用的操作数据库语言SQL语句在执行的时候需要要先编译,然后执行,而存储过程(Stored Procedure)是一组为了完成特定功能的SQL语句集,经编译后存储...

    MYSQL教程网4592019-12-04
  • MysqlMySQL中有哪些情况下数据库索引会失效详析

    MySQL中有哪些情况下数据库索引会失效详析

    这篇文章主要给大家介绍了关于MySQL中有哪些情况下数据库索引会失效的相关资料,文中通过图文介绍的非常详细,对大家学习或者使用mysql具有一定的参考...

    MicroHeart!1582019-07-07
  • MysqlMySQL 4种导入数据的方法

    MySQL 4种导入数据的方法

    这篇文章主要介绍了MySQL 导入 导出数据的方法,文中讲解非常详细,代码帮助大家更好的理解和学习,感兴趣的朋友可以了解下...

    菜鸟教程1892020-07-05
  • MysqlMySQL优化GROUP BY方案

    MySQL优化GROUP BY方案

    满足GROUP BY子句的最一般的方法是扫描整个表并创建一个新的临时表,表中每个组的所有行应为连续的,然后使用该临时表来找到组并应用累积函数(如果有...

    hebedich1812020-04-11
  • MysqlMySQL 5.7.13 源码编译安装配置方法图文教程

    MySQL 5.7.13 源码编译安装配置方法图文教程

    这篇文章主要介绍了MySQL 5.7.13 源码编译安装配置方法图文教程,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    xyang09172942020-07-12