Home >> SmartFactory NonUI +- 1. 파생항목 +- 2. 메시지생성 +- 3. Send TC to Kafka +- 4. Skip TC |
applicationContext.xml
Glue 기반에서 kafka에 데이터를 보내는 경우 applicationContext.xml에 PosKafkaProducer 클래스가 등록되어 있어야 합니다.
<bean id="kafkaSender" class="com.posco.reuse.kafka.clients.PosKafkaProducer"> <property name="propertiesMap"> <map> <entry key="bootstrap.servers" value="192.168.193.141:9092,192.168.193.142:9092" /> </map> </property> </bean>
PosKafkaProducer는 kafka에서 제공하는 KafkaProducer를 이용하는데 KafkaProducer에 설정되는 property값을 propertiesMap을 통해서 설정할 수 있습니다. kafka에서 제공하는 api라이브러리는 JDK 1.7 이상에서만 동작합니다.
PosKafkaProducer 를 통에서 kafka에 데이터를 보내는 테스트용 소스
PosKafkaProducer pro = (PosKafkaProducer) GlueStaticContext.getBeanFactory().getBeanObject("kafkaSender"); SFMessage.Builder builder = SFMessage.newBuilder(); builder.setTcID( "GlueTest1" ); List<CharSequence> keys = new ArrayList<CharSequence>(); keys.add( "key1" ); builder.setKeys( keys ); Map<CharSequence, List<CharSequence>> datas = new LinkedHashMap<CharSequence, List<CharSequence>>(); List<CharSequence> data1 = new ArrayList<CharSequence>(); data1.add( "data1" ); datas.put( "key", data1 ); builder.setDatas( datas ); try { pro.sendMessage("GLUE_TEST03", AvroHelper.marshalAvro( SFMessage.getClassSchema(), builder.build() )); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); }
PosKafkaProducer에서 sendMessage를 통해서 kafka에 데이타를 보내는 로직은 메인 쓰레드 이외의 별도 쓰레드에서 실행되는 async 방식으로 실행되며 데이타 전송이 완료 되면 아래와 같은 로그가 info 레벨에서 남습니다.
"message(" + messageId + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"
전송할 곳의 kafka의 버전과 Glue에서 사용하는 kafka 라이브러리의 버전이 일치하지 않으면 데이타 전송시 에러가 발생하므로 버전을 꼭 확인해야 함.
데이터 전송을 kafka tool을 통해서 할 경우 해당 topic에서 Messages의 [Total number of messages]의 값은 2건 이상의 데이터가 왔을때 부터 정확히 표시됨을 유의해야 합니다.
수신된 내용을 단순 건수가 아니라 구체적인 내용이 보고 싶으신 경우에는 RTP 의 도움을 받으면 될것으로 보입니다.
SFMessageSend
PosKafkaProducer 를 통에서 byte[] 데이타를 Kafka에 송신하는 기능을 수행하는 리유즈 액티비티
Activity Property
Kafka의 GLUE_TEST04라는 topic으로 데이타를 송신하는 경우
applicationContext.xml에 등록되어 있는 PosKafkaProducer bean id 는 kafkaSender이고
Kafka에 전송할 데이터는 sfmessage를 Key로 GlueContext에 등록되어 있는 경우
<activity name="sendMessage" class="com.posco.reuse.kafka.activity.SFMessageSend"> <transition name="success" value="end"/> <property name="sender-id" value="kafkaSender"/> <property name="topic" value="GLUE_TEST04"/> <property name="message-key" value="sfmessage"/> </activity>
Kafka의 topic 값이 GlueContext에 topic을 Key로 등록되어 있는 경우
<activity name="sendMessage" class="com.posco.reuse.kafka.activity.SFMessageSend"> <transition name="success" value="end"/> <property name="sender-id" value="kafkaSender"/> <property name="topic-key" value="topic"/> <property name="message-key" value="sfmessage"/> </activity>
SFMessageCreate
avro형식의 SFMessage형의 객체를 생성하여 해당 객체를 byte[]로 변환하여 GlueContext에 저장하는 리유즈 액티비티
Activity Property
<activity name="createMessage01" class="com.posco.reuse.kafka.activity.SFMessageCreate"> <transition name="success" value="sendMessage"/> <property name="keys" value="a|b|c|d|e"/> <property name="data1" value="d1|d1-1|d1-2"/> <property name="data2" value="d2"/> <property name="datas-keys" value="a1|a2|a3|a4"/> <property name="data3" value="d3"/> <property name="tc-id" value="Test01"/> <property name="data4" value="d4"/> <property name="result-key" value="sfmessage"/> </activity>
tcid : Test01 keys : [a, b, c, d, e] datas : {a1=[d1, d1-1, d1-2], a2=[d2], a3=[d3], a4=[d4]}
<activity name="createMessage02" class="com.posco.reuse.kafka.activity.SFMessageCreate"> <transition name="success" value="sendMessage"/> <property name="data1-ctx" value="dc1|dc1-1|dc1-2"/> <property name="data2-ctx" value="dc2"/> <property name="datas-keys-ctx" value="datakeys"/> <property name="data3-ctx" value="dc3"/> <property name="data4-ctx" value="dc4"/> <property name="tc-id" value="Test02"/> <property name="keys-ctx" value="testkeys"/> <property name="result-key" value="sfmessage"/> </activity>
String[] testkeys = {"t1","t2","t3"}; String[] datakeys = {"d1","d2","d3","d4"}; ctx.put( "testkeys", testkeys ); ctx.put( "datakeys", datakeys ); ctx.put( "dc1", "01" ); ctx.put( "dc2", "02" ); ctx.put( "dc3", "03" ); ctx.put( "dc4", "04" ); ctx.put( "dc1-1", "01-1" ); ctx.put( "dc1-2", "01-2" );
tcid : Test02 keys : [t1, t2, t3] datas : {d1=[01, 01-1, 01-2], d2=[02], d3=[03], d4=[04]}
<activity name="createMessage03" class="com.posco.reuse.kafka.activity.SFMessageCreate"> <transition name="success" value="sendMessage"/> <property name="glue-message-convert" value="true"/> <property name="result-key" value="sfmessage"/> </activity>
GlueMessage message = new GlueMESMessageImpl(); message.setTCID("Test03"); message.put("name", "cho"); message.put("addr", "buchoon1234567890");; message.put("phonenum", "01099991234"); message.put("num1", new BigDecimal("12.345")); message.put("time", new Timestamp(System.currentTimeMillis())); List<Object> newGroupAttrs = new ArrayList<Object>(); newGroupAttrs.add("groupvalue"); newGroupAttrs.add(new BigDecimal("34.789")); newGroupAttrs.add(new Timestamp(System.currentTimeMillis())); message.put("group", newGroupAttrs); ctx.setMessage(message);
tcid : Test03 keys : [time, name, addr, group, phonenum, num1] datas : {addr=[buchoon1234567890], time=[20160304104207], phonenum=[01099991234], group=[groupvalue, 34.789, 20160304104207], num1=[12.345], name=[cho]}
avro - SFMessage 관련 클래스는 iot-site-schema-1.0.0.jar 에 포함되어 있습니다.
현재 데이터 전송 포멧이 최근에 제공된 SFMessage 이라고 예상하여 이 리유즈 액티비티를 제공하고 있으나
만일 전송 포멧이 SFMessage 가 아니라면 이 리유즈 액티비티는 사용할 필요가 없으므로 이 클래스 소스를 참고하여
정해진 포멧의 데이타를 생성하는 클래스를 새로 만들어야 합니다.
라이브러리
avro-1.7.7.jar commons-compress-1.4.1.jar iot-site-schema-1.0.0.jar jackson-core-asl-1.9.13.jar jackson-mapper-asl-1.9.13.jar kafka-clients-0.8.2.2.jar lz4-1.2.0.jar paranamer-2.3.jar snappy-java-1.0.5.jar xz-1.0.jar
제공된 gluestd_kafka.zip에는 kafka 전송과 avro 사용에 필요한 기본 라이브러리가 들어 있습니다.
라이브러리 중 iot-site-schema-1.0.0.jar 는 오픈 소스가 아니라 Interface M/W 에서 제공하는 라이브러리 입니다.
iot-site-schema-1.0.0.jar 가 업데이트 되면 Interface M/W를 통해서 꼭 업데이트 하시기 바랍니다.
샘플 프로젝트
제공된 sample-smp.zip에는 message-service.xml을 실행 시켜 kafka에 데이터를 전송하는 TestGlueService.java 와 PosKafkaProducer를 통해서 java소스 상에서 kafka에 데이터를 전송하는 TestKafkaSend.java가 있습니다.