파이썬으로 엘라스틱 서치 쿼리하기

파이썬으로 엘라스틱 서치를 쿼리하는 방법에 대해 알아봅시다.

엘라스틱 서치를 파이썬으로 쿼리하는 방법에 대해 알아보겠습니다.

현업에서 엘라스틱 서치를 통해 정보를 받고 Kibana를 통해 시각화를 많이 하고 있는데요, 이러한 로그 정보들을 파이썬을 통해 분석하고 싶은 경우가 있습니다. 예를 든다면 dau(daily activate user)의 정보를 수집하고 있는데 몇시에 가장 많이 접속하는지, 이런 정보를 가공해 다른 곳에 사용한다던지 말이죠.

이러한 정보를 키바나를 통해 확인할 수 있지만, 별도의 레포트를 만들 경우 seaborn이나 matplotlib을 통해 시각화를 진행할때가 많습니다. 그렇다면 어떻게 엘라스틱 서치로 저장된 정보를 파이썬으로 쿼리할 수 있을 지 알아보겠습니다.

여기서는 scroll 메서드를 사용해 순차적으로 저장된 모든 정보를 불러오고 json 파일로 저장하는 방법을 살펴봅니다.

예제

아래 예제에서는 dau 라는 이름을 가진 인덱스에서 product_id=2021 인 정보를 모두 쿼리합니다.

1
2
3
4
5
6
7
import json
from datetime import datetime
from elasticsearch import Elasticsearch
host = "받아올 엘라스틱 서치 URI"
es = Elasticsearch(host)
indices = ['dau'] # 인덱스 명
size = 10000 # 한 번에 받아올 데이터 갯수

여기서 host 는 본인이 사용하고 있는 엘라스틱 서치 서비스의 URI입니다. size=10000 은 한 번에 1만개씩 받아오겠다는 의미입니다.

1
2
3
4
5
6
7
8
doc = {
    'size': size,
    'query': {
        'match': {
            'product_id': 2021, 
        }
    }
}

doc에 쿼리 정보를 담아줍니다.

1
2
3
4
# 이전 스크롤 1초 저장
response = es.search(index=indices, body=doc, scroll='1s')
old_scroll_id = response['_scroll_id']
result = []

response 에 엘라스틱 서치 search 메서드를 정의합니다. old_scroll_id 에 초기 스크롤 id를 정의하고 결과를 받을 빈 리스트를 정의합니다.(result)

1
2
for d in response['hits']['hits']:
    result.append(d['_source'])

가장 처음 쿼리를 저장합니다.

1
2
3
4
5
6
while len(response['hits']['hits']):
    response = es.scroll(scroll_id=old_scroll_id, scroll='1s')
    for d in response['hits']['hits']:
        result.append(d['_source'])
    old_scroll_id = response['_scroll_id'] # scroll id 초기화
    print(f'Result Length: {len(result)}')

이후 반복문을 통해 더 이상 받아올 정보가 없을 때 까지 탐색하여 정보를 저장합니다.

1
2
3
# save file
with open('es_dau.json', 'w') as f:
    json.dump(result, f)

마지막으로 result 를 json 형식의 파일로 저장하면 이후 자유롭게 어디서나 쿼리한 정보를 받아올 수 있습니다.

전체코드

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import json
from datetime import datetime
from elasticsearch import Elasticsearch
host = "사용할 엘라스틱 서치 URL"
es = Elasticsearch(host)
indices = ['dau']
size = 10000

doc = {
    'size': size,
    'query': {
        'match': {
            'product_id': 2021, 
        }
    }
}

# 이전 스크롤 1초 저장
response = es.search(index=indices, body=doc, scroll='1s') 
old_scroll_id = response['_scroll_id']
result = []

for d in response['hits']['hits']:
    result.append(d['_source'])

while len(response['hits']['hits']):
    response = es.scroll(scroll_id=old_scroll_id, scroll='1s')
    for d in response['hits']['hits']:
        result.append(d['_source'])
    old_scroll_id = response['_scroll_id'] # scroll id 초기화
    print(f'Result Length: {len(result)}')

# save file
with open('es_dau.json', 'w') as f:
    json.dump(result, f)
Hugo로 만듦
JimmyStack 테마 사용 중