Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的Hibernate风格的对象/文档映射,使用rest api用于文档查询。
索引工作原理:
@Document
注解的实体,为它们构建元数据。index
和mapping
Json格式的配置,然后通过ElasticSearch Java Rest Client
将创建/更新index
配置。Kafka ElasticSearch Connector
,用于创建/更新文档Jkes Deleter Connector
,用于删除文档* save(*)
方法返回的数据包装为SaveEvent
保存到EventContainer
;使用(* delete*(..)
方法的参数,生成一个DeleteEvent/DeleteAllEvent
保存到EventContainer
。JkesKafkaProducer
发送SaveEvent
中的实体到Kafka,Kafka会使用我们提供的JkesJsonSerializer
序列化指定的数据,然后发送到Kafka。SaveEvent
不同,DeleteEvent
会直接被序列化,然后发送到Kafka,而不是只发送一份数据SaveEvent
和DeleteEvent
不同,DeleteAllEvent
不会发送数据到Kafka,而是直接通过ElasticSearch Java Rest Client
删除相应的index
,然后重建该索引,重启Kafka ElasticSearch Connector
查询工作原理:
json
请求,进行一些预处理后,使用ElasticSearch Java Rest Client
转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。可以参考jkes-integration-test
项目快速掌握jkes框架的使用方法。jkes-integration-test
是我们用来测试功能完整性的一个Spring Boot Application。
jkes-index-connector
和jkes-delete-connector
到Kafka Connect类路径安装 Smart Chinese Analysis Plugin
sudo bin/elasticsearch-plugin install analysis-smartcn
添加配置
@EnableAspectJAutoProxy
@EnableJkes
@Configuration
public class JkesConfig {
@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) {
return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport);
}
}
提供JkesProperties Bean
@Component
@Configuration
@PropertySource("classpath:jkes.properties")
public class JkesConf extends DefaultJkesPropertiesImpl {
@Bean
public static PropertySourcesPlaceholderConfigurer
propertySourcesPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
@PostConstruct
public void setUp() {
Config.setJkesProperties(this);
}
@Value("${kafka.bootstrap.servers}")
private String kafkaBootstrapServers;
@Value("${kafka.connect.servers}")
private String kafkaConnectServers;
@Value("${es.bootstrap.servers}")
private String esBootstrapServers;
@Value("${document.base_package}")
private String documentBasePackage;
@Value("${jkes.client.id}")
private String clientId;
@Override
public String getKafkaBootstrapServers() {
return replaceDomainNameWithIp(this.kafkaBootstrapServers);
}
@Override
public String getKafkaConnectServers() {
return kafkaConnectServers;
}
@Override
public String getEsBootstrapServers() {
return replaceDomainNameWithIp(this.esBootstrapServers);
}
@Override
public String getDocumentBasePackage() {
return documentBasePackage;
}
@Override
public String getClientId() {
return clientId;
}
public void setKafkaBootstrapServers(String kafkaBootstrapServers) {
this.kafkaBootstrapServers = kafkaBootstrapServers;
}
public void setKafkaConnectServers(String kafkaConnectServers) {
this.kafkaConnectServers = kafkaConnectServers;
}
public void setEsBootstrapServers(String esBootstrapServers) {
this.esBootstrapServers = esBootstrapServers;
}
public void setDocumentBasePackage(String documentBasePackage) {
this.documentBasePackage = documentBasePackage;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
// mainly used for test. in inner env, use ip directly or domainName if dns parse is available
private String replaceDomainNameWithIp(String u) {
StringBuilder stringBuilder = new StringBuilder();
String[] urls = u.split(",");
Arrays.stream(urls).forEach(urlStr -> {
if(urlStr.startsWith("http")) {
try {
URL url = new URL(urlStr);
InetAddress address = InetAddress.getByName(url.getHost());
String ip = address.getHostAddress();
stringBuilder
.append(url.getProtocol()).append("://")
.append(ip).append(":")
.append(url.getPort())
.append(",");
} catch (UnknownHostException | MalformedURLException e) {
throw new RuntimeException(e);
}
}else {
String[] split = urlStr.split(":");
try {
stringBuilder.append(HttpUtils.getIpsFromDomainName(split[0])).append(":").append(split[1]).append(",");
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
});
return stringBuilder.deleteCharAt(stringBuilder.length() - 1).toString();
}
}
kafka.bootstrap.servers=k1-test.com:9292,k2-test.com:9292,k3-test.com:9292
kafka.connect.servers=http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084
es.bootstrap.servers=http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200
document.base_package=com.timeyang.jkes.integration_test.domain
jkes.client.id=integration_test
这里可以很灵活,如果使用Spring Boot,可以直接使用@ConfigurationProperties
增加索引管理端点
因为我们不知道客户端使用的哪种web技术,所以索引端点需要在客户端添加。比如在Spring MVC
中,可以按照如下方式添加索引端点
@RestController
@RequestMapping("/api/search")
public class SearchEndpoint {
private Indexer indexer;
@Autowired
public SearchEndpoint(Indexer indexer) {
this.indexer = indexer;
}
@RequestMapping(value = "/start_all", method = RequestMethod.POST)
public void startAll() {
indexer.startAll();
}
@RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST)
public void start(@PathVariable("entityClassName") String entityClassName) {
indexer.start(entityClassName);
}
@RequestMapping(value = "/stop_all", method = RequestMethod.PUT)
public Map<String, Boolean> stopAll() {
return indexer.stopAll();
}
@RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT)
public Boolean stop(@PathVariable("entityClassName") String entityClassName) {
return indexer.stop(entityClassName);
}
@RequestMapping(value = "/progress", method = RequestMethod.GET)
public Map<String, IndexProgress> getProgress() {
return indexer.getProgress();
}
}
实体注解
@Entity
@Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@MultiFields(
mainField = @Field(type = FieldType.Text),
otherFields = {
@InnerField(suffix = "raw", type = FieldType.Keyword),
@InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
}
)
private String name;
@Field(type = FieldType.Keyword)
private String gender;
@Field(type = FieldType.Integer)
private Integer age;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description;
@Field(type = FieldType.Object)
@ManyToOne(fetch = FetchType.EAGER)
@JoinColumn(name = "group_id")
private PersonGroup personGroup;
public Long getId() {
return id;
}
public String getName() {
return name;
}
public String getGender() {
return gender;
}
public Integer getAge() {
return age;
}
public String getDescription() {
return description;
}
public PersonGroup getPersonGroup() {
return personGroup;
}
}
@Entity
@Document(type = "person_group", alias = "person_group_alias")
public class PersonGroup extends AuditedEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
private String interests;
@OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true)
private List<Person> persons;
private String description;
@DocumentId
@Field(type = FieldType.Long)
public Long getId() {
return id;
}
@MultiFields(
mainField = @Field(type = FieldType.Text),
otherFields = {
@InnerField(suffix = "raw", type = FieldType.Keyword),
@InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
}
)
public String getName() {
return name;
}
@Field(type = FieldType.Text)
public String getInterests() {
return interests;
}
@Field(type = FieldType.Nested)
public List<Person> getPersons() {
return persons;
}
/**
* 不加Field注解,测试序列化时是否忽略
*/
public String getDescription() {
return description;
}
}
setter
方法在这里被省略。
URI query
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
Nested query
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"nested": {
"path": "persons",
"score_mode": "avg",
"query": {
"bool": {
"must": [
{
"range": {
"persons.age": {
"gt": 5
}
}
}
]
}
}
}
}
}
match query
integration_test_person_group/person_group/_search?from=0&size=10
{
"query": {
"match": {
"interests": "Hadoop"
}
}
}
bool query
{
"query": {
"bool" : {
"must" : {
"match" : { "interests" : "Hadoop" }
},
"filter": {
"term" : { "name.raw" : "name0" }
},
"should" : [
{ "match" : { "interests" : "Flink" } },
{
"nested" : {
"path" : "persons",
"score_mode" : "avg",
"query" : {
"bool" : {
"must" : [
{ "match" : {"persons.name" : "name40"} },
{ "match" : {"persons.interests" : "interests"} }
],
"must_not" : {
"range" : {
"age" : { "gte" : 50, "lte" : 60 }
}
}
}
}
}
}
],
"minimum_should_match" : 1,
"boost" : 1.0
}
}
}
Source filtering
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
prefix
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
wildcard
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
regexp
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
jkes-core
是整个jkes
的核心部分。主要包括以下功能:
annotation
包提供了jkes的核心注解elasticsearch
包封装了elasticsearch
相关的操作,如为所有的文档创建/更新索引,更新mappingkafka
包提供了Kafka 生产者,Kafka Json Serializer,Kafka Connect Clientmetadata
包提供了核心的注解元数据的构建与结构化模型event
包提供了事件模型与容器exception
包提供了常见的Jkes异常http
包基于Apache Http Client
封装了常见的http json请求support
包暴露了Jkes核心配置支持util
包提供了一些工具类,便于开发。如:Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtilsjkes-boot
用于与一些第三方开源框架进行集成。
当前,我们通过jkes-spring-data-jpa
,提供了与spring data jpa
的集成。通过使用Spring的AOP机制,对Repository
方法进行拦截,生成SaveEvent/DeleteEvent/DeleteAllEvent
保存到EventContainer
。通过使用我们提供的SearchPlatformTransactionManager
,对常用的事务管理器(如JpaTransactionManager
)进行包装,提供事务拦截功能。
在后续版本,我们会提供与更多框架的集成。
jkes-spring-data-jpa
说明:
ContextSupport
类用于从bean工厂获取Repository Bean
@EnableJkes
让客户端能够轻松开启Jkes的功能,提供了与Spring一致的配置模型EventSupport
处理事件的细节,在保存和删除数据时生成相应事件存放到EventContainer
,在事务提交和回滚时处理相应的事件SearchPlatformTransactionManager
包装了客户端的事务管理器,在事务提交和回滚时加入了回调hook
audit
包提供了一个简单的AuditedEntity
父类,方便添加审计功能,版本信息可用于结合ElasticSearch
的版本机制保证不会索引过期文档数据exception
包封装了常见异常intercept
包提供了AOP切点和切面index
包提供了全量索引
功能。当前,我们提供了基于线程池
的索引机制和基于ForkJoin
的索引机制。jkes-services
主要用来提供一些服务。
目前,jkes-services
提供了以下服务:
jkes-delete-connector
jkes-delete-connector
是一个Kafka Connector
,用于从kafka集群获取索引删除事件(DeleteEvent
),然后使用Jest Client
删除ElasticSearch中相应的文档。
借助于Kafka Connect的rest admin api,我们轻松地实现了多租户平台上的文档删除功能。只要为每个项目启动一个jkes-delete-connector
,就可以自动处理该项目的文档删除工作。避免了每启动一个新的项目,我们都得手动启动一个Kafka Consumer来处理该项目的文档删除工作。尽管可以通过正则订阅来减少这样的工作,但是还是非常不灵活
jkes-search-service
jkes-search-service
是一个restful的搜索服务,提供了多版本的rest query api。查询服务提供多版本API,用于API进化和兼容jkes-search-service
目前支持URI风格的搜索和JSON请求体风格的搜索。json
请求,进行一些预处理后,使用ElasticSearch Java Rest Client
转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。后续,我们将会基于zookeeper
构建索引集群,提供集群索引管理功能
jkes-integration-test
是一个基于Spring Boot集成测试项目,用于进行功能测试
。同时测量一些常见操作的吞吐率