2020. 4. 7. 17:19ㆍNifi
##Flow example
위 FLOW에서 최종 Processor는 PUTSQL이다.
최종 Processor에서 FLOW가 끝났다는 것을 보장하기 위해서는 DROP과 SEND가 쌍으로 이루어져야 있어야한다. SEND의 의미가 RedShift로 Record값들을 보냈다는 것을 의미하고 SEND가 완료되면 NIFI에서 해당 데이터 셋을 DROP한다 그러나 SEND가 없이 DROP이 이루어 진다면 Record값이 PUT이 안된것으로 볼 수 있다.
해당값들을 파싱하여 볼 수 있도록 하는것이 이번 장의 목표이다.
#1 Data provenance 확인
#2 Data Provenance REST API 호출 Command
curl 'http://10.200.100.120:8080/nifi-api/provenance' -H 'Content-Type: application/json' -H 'Accept: application/json, text/javascript, */*; q=0.01' --data-binary '{"provenance":{"request":{"maxResults":1000,"searchTerms":{"ProcessorID":"9b563125-5ac7-1317-8fc1-74d674222040"}}}}' --compressed
#ProcessorId는 직접 파악해야한다.
#3결과 개수 6개 확인
##참조
API 호출을 해보면 ID 의 종류가 엄청나게 다양하다. ID는 두가지로 나뉜다. Nifi WebUI에서 조회할 수 있는 ID와 API 호출을 통해서만 볼 수 있는 ID ProcessorID, ConnectionID, FlowFilleUUID, ClusterNodeId는 NIFI UI에서 조회가 가능하지만 그 밖의 ClientID, Version, Provenance Linage ID ProvenanceID 등은 API호출을 통해서 확인해야한다.
#4 Python으로 파싱하여 Print
Curl 명령어를 Python3 코드로 바꿔주는 사이트
import requests
import json
import os
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json, text/javascript, */*; q=0.01',
}
data = '{"provenance":{"request":{"maxResults":50,"searchTerms":{"Filename":"test2.txt","ProcessorID":"9b563125-5ac7-1317-8fc1-74d674222040"}}}}'
url = 'http://10.200.100.120:8080/nifi-api/provenance'
response = requests.post(url, headers=headers, data=data)
pro = response.json()
subdata = pro['provenance']
delete_id = subdata['id']
os.system("curl -X DELETE http://10.200.100.120:8080/nifi-api/provenance/" + delete_id )
##curl –X DELETE를 해야하는 이유 동적인Id(provenanveid, lineageId etc)가 11개 이상이 되면 더 이상 API 를 호출할 수 없도록 설정 되어있다. 해당 설정을 바꾸기 위해 해당 Jar파일을 찾았지만 참조 하고 있는 다른 Jar파일들이 많아 모두 컴파일이 필요한 상황이어서 아예 처음 호출시 삭제해주는 Curl –X DELETE를 넣어주었다.
subdata2 = subdata['results']
subdata3 = subdata2['provenanceEvents']
for n in subdata3:
#time = (n['eventTime'])
#type = (n['eventType'])
print (n['eventTime'], ":" ,n['eventType'])
#5 Python 파일 실행
시간대와 EventType을 출력 및 결과 비교
(base) [root@nifi00 ~]# python provenence.py
{}04/06/2020 18:50:04.440 KST : DROP
04/06/2020 18:50:04.408 KST : SEND
04/06/2020 14:49:12.930 KST : ATTRIBUTES_MODIFIED
04/06/2020 14:49:12.929 KST : DROP
04/06/2020 14:19:02.210 KST : DROP
04/06/2020 14:19:02.172 KST : SEND
'Nifi' 카테고리의 다른 글
NIFI Disk Guide, Disk Full일 경우 증상 및 예방 정책 (3) | 2020.06.09 |
---|---|
NIFI 모든 노드에 작업 분배하기 (Using ListSFTP -> RPG / Input Port -> FetchSFTP) (3) | 2020.04.16 |
Nifi를 통하여 File형태를 AWS Redshift에 LOAD (0) | 2020.04.07 |
Nifi를 통하여 AWS RedShift Connection 및 load data (2) | 2020.04.06 |
NIFI Rest API를 활용한 Processor 호출 및 Properties 변경 (1) | 2020.04.04 |