Home >> SmartFactory NonUI 
+- 1. 파생항목 
+- 2. 메시지생성 
+- 3. Send TC to Kafka 
+- 4. Skip TC

kafka 데이터 전송

  1. applicationContext.xml

    Glue 기반에서 kafka에 데이터를 보내는 경우 applicationContext.xml에 PosKafkaProducer 클래스가 등록되어 있어야 합니다.

    <bean id="kafkaSender" class="com.posco.reuse.kafka.clients.PosKafkaProducer">
        <property name="propertiesMap">
            <map>
                <entry key="bootstrap.servers" value="192.168.193.141:9092,192.168.193.142:9092" />
            </map>
        </property>
    </bean>
    

    PosKafkaProducer는 kafka에서 제공하는 KafkaProducer를 이용하는데 KafkaProducer에 설정되는 property값을 propertiesMap을 통해서 설정할 수 있습니다. kafka에서 제공하는 api라이브러리는 JDK 1.7 이상에서만 동작합니다.

    PosKafkaProducer 를 통에서 kafka에 데이터를 보내는 테스트용 소스

    PosKafkaProducer pro = (PosKafkaProducer) GlueStaticContext.getBeanFactory().getBeanObject("kafkaSender");
    
    SFMessage.Builder builder = SFMessage.newBuilder();
    
    builder.setTcID( "GlueTest1" );
    
    List<CharSequence> keys = new ArrayList<CharSequence>();
    keys.add( "key1" );
    builder.setKeys( keys );
    
    Map<CharSequence, List<CharSequence>> datas = new LinkedHashMap<CharSequence, List<CharSequence>>();
    
    List<CharSequence> data1 = new ArrayList<CharSequence>();
    data1.add( "data1" );
    
    datas.put( "key", data1 );
    
    builder.setDatas( datas );
    
    try {
      pro.sendMessage("GLUE_TEST03", AvroHelper.marshalAvro( SFMessage.getClassSchema(), builder.build() ));
      Thread.sleep(1000);
    } catch (Exception e) {
      e.printStackTrace();
    }
    

    PosKafkaProducer에서 sendMessage를 통해서 kafka에 데이타를 보내는 로직은 메인 쓰레드 이외의 별도 쓰레드에서 실행되는 async 방식으로 실행되며 데이타 전송이 완료 되면 아래와 같은 로그가 info 레벨에서 남습니다.

    "message(" + messageId + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"
    

    전송할 곳의 kafka의 버전과 Glue에서 사용하는 kafka 라이브러리의 버전이 일치하지 않으면 데이타 전송시 에러가 발생하므로 버전을 꼭 확인해야 함.
    데이터 전송을 kafka tool을 통해서 할 경우 해당 topic에서 Messages의 [Total number of messages]의 값은 2건 이상의 데이터가 왔을때 부터 정확히 표시됨을 유의해야 합니다.

    수신된 내용을 단순 건수가 아니라 구체적인 내용이 보고 싶으신 경우에는 RTP 의 도움을 받으면 될것으로 보입니다.

  2. SFMessageSend

    PosKafkaProducer 를 통에서 byte[] 데이타를 Kafka에 송신하는 기능을 수행하는 리유즈 액티비티

    Activity Property

    • sender-id : (필수) applicationContext.xml에 등록되어 있는 PosKafkaProducer bean id.
    • message-key : (필수) GlueContext에 등록되어 있는 byte[] 객체의 Key
    • topic-key : (선택) Kafka의 topic 값이 등록되어 있는 GlueContext 의 Key
    • topic : (선택) Kafka의 topic값
    1. 사용 예# 1

      Kafka의 GLUE_TEST04라는 topic으로 데이타를 송신하는 경우
      applicationContext.xml에 등록되어 있는 PosKafkaProducer bean id 는 kafkaSender이고
      Kafka에 전송할 데이터는 sfmessage를 Key로 GlueContext에 등록되어 있는 경우

      <activity name="sendMessage" class="com.posco.reuse.kafka.activity.SFMessageSend">
          <transition name="success" value="end"/>
          <property name="sender-id" value="kafkaSender"/>
          <property name="topic" value="GLUE_TEST04"/>
          <property name="message-key" value="sfmessage"/>
      </activity>
      
    2. 사용 예# 2

      Kafka의 topic 값이 GlueContext에 topic을 Key로 등록되어 있는 경우

      <activity name="sendMessage" class="com.posco.reuse.kafka.activity.SFMessageSend">
          <transition name="success" value="end"/>
          <property name="sender-id" value="kafkaSender"/>
          <property name="topic-key" value="topic"/>
          <property name="message-key" value="sfmessage"/>
      </activity>
      
  3. SFMessageCreate

    avro형식의 SFMessage형의 객체를 생성하여 해당 객체를 byte[]로 변환하여 GlueContext에 저장하는 리유즈 액티비티

    Activity Property

    • result-key: (필수) 생성된 avro 데이터를 GlueContext에 등록하는 Key
    • tc-id: (선택) SFMessage 데이터의 TC-ID 값
    • keys: (선택) FMessage 데이터의 keys 값. 리스트형 데이타로 | 를 구분자로 입력(예: a|b|c)
    • keys-ctx: (선택) FMessage 데이터의 keys 값. GlueContext에 String[] 형으로 등록되어 있는 경우 해당 객체의 Key
    • datas-keys: (선택) FMessage 데이터의 datas에서 Map의 Key 값. 리스트형 데이타로 | 를 구분자로 입력(예: a|b|c)
    • datas-keys-ctx: (선택) FMessage 데이터의 datas에서 Map의 Key 값. GlueContext에 String[] 형으로 등록되어 있는 경우 해당 객체의 Key
    • data#(data1,data2...) : (선택) FMessage 데이터의 datas에서 Map의 value 값. Key 값이 정의된 순서대로 맵핑
    • data#-ctx(data1-ctx,data2-ctx...) : (선택) FMessage 데이터의 datas에서 Map의 value 값을 가져 올 GlueContext의 Key.
    • glue-message-convert : (선택) GlueContext의 GlueMessage를 변환하는 경우 입력(입력 값 : true)
    1. 사용 예# 1
      <activity name="createMessage01" class="com.posco.reuse.kafka.activity.SFMessageCreate">
          <transition name="success" value="sendMessage"/>
          <property name="keys" value="a|b|c|d|e"/>
          <property name="data1" value="d1|d1-1|d1-2"/>
          <property name="data2" value="d2"/>
          <property name="datas-keys" value="a1|a2|a3|a4"/>
          <property name="data3" value="d3"/>
          <property name="tc-id" value="Test01"/>
          <property name="data4" value="d4"/>
          <property name="result-key" value="sfmessage"/>
      </activity>
      
      • 테스트 실행 시 GlueContext 에 담기는 데이타 : 필요 없음
      • 생성된 SFMessage데이터
        tcid : Test01
        keys : [a, b, c, d, e]
        datas : {a1=[d1, d1-1, d1-2], a2=[d2], a3=[d3], a4=[d4]}
        
    2. 사용 예# 2
      <activity name="createMessage02" class="com.posco.reuse.kafka.activity.SFMessageCreate">
          <transition name="success" value="sendMessage"/>
          <property name="data1-ctx" value="dc1|dc1-1|dc1-2"/>
          <property name="data2-ctx" value="dc2"/>
          <property name="datas-keys-ctx" value="datakeys"/>
          <property name="data3-ctx" value="dc3"/>
          <property name="data4-ctx" value="dc4"/>
          <property name="tc-id" value="Test02"/>
          <property name="keys-ctx" value="testkeys"/>
          <property name="result-key" value="sfmessage"/>
      </activity>
      
      • 테스트 실행 시 GlueContext 에 담기는 데이타
        String[] testkeys = {"t1","t2","t3"};
        String[] datakeys = {"d1","d2","d3","d4"};
        ctx.put( "testkeys", testkeys );
        ctx.put( "datakeys", datakeys );
        ctx.put( "dc1", "01" );
        ctx.put( "dc2", "02" );
        ctx.put( "dc3", "03" );
        ctx.put( "dc4", "04" );
        ctx.put( "dc1-1", "01-1" );
        ctx.put( "dc1-2", "01-2" );
        
      • 생성된 SFMessage데이터
        tcid : Test02
        keys : [t1, t2, t3]
        datas : {d1=[01, 01-1, 01-2], d2=[02], d3=[03], d4=[04]}
        
    3. 사용 예# 3
      <activity name="createMessage03" class="com.posco.reuse.kafka.activity.SFMessageCreate">
          <transition name="success" value="sendMessage"/>
          <property name="glue-message-convert" value="true"/>
          <property name="result-key" value="sfmessage"/>
      </activity>
      
      • 테스트 실행 시 GlueContext 에 담기는 데이타
        GlueMessage message = new  GlueMESMessageImpl();
        message.setTCID("Test03");
        message.put("name", "cho");
        message.put("addr", "buchoon1234567890");;
        message.put("phonenum", "01099991234");
        message.put("num1", new BigDecimal("12.345"));
        message.put("time", new Timestamp(System.currentTimeMillis()));
        List<Object> newGroupAttrs = new ArrayList<Object>();
        newGroupAttrs.add("groupvalue");
        newGroupAttrs.add(new BigDecimal("34.789"));
        newGroupAttrs.add(new Timestamp(System.currentTimeMillis()));
        message.put("group", newGroupAttrs);
        ctx.setMessage(message);
        
      • 생성된 SFMessage데이터
        tcid : Test03
        keys : [time, name, addr, group, phonenum, num1]
        datas : {addr=[buchoon1234567890], time=[20160304104207], phonenum=[01099991234], group=[groupvalue, 34.789, 20160304104207], num1=[12.345], name=[cho]}
        

    avro - SFMessage 관련 클래스는 iot-site-schema-1.0.0.jar 에 포함되어 있습니다.

    현재 데이터 전송 포멧이 최근에 제공된 SFMessage 이라고 예상하여 이 리유즈 액티비티를 제공하고 있으나

    만일 전송 포멧이 SFMessage 가 아니라면 이 리유즈 액티비티는 사용할 필요가 없으므로 이 클래스 소스를 참고하여

    정해진 포멧의 데이타를 생성하는 클래스를 새로 만들어야 합니다.

  4. 라이브러리

    avro-1.7.7.jar
    commons-compress-1.4.1.jar
    iot-site-schema-1.0.0.jar
    jackson-core-asl-1.9.13.jar
    jackson-mapper-asl-1.9.13.jar
    kafka-clients-0.8.2.2.jar
    lz4-1.2.0.jar
    paranamer-2.3.jar
    snappy-java-1.0.5.jar
    xz-1.0.jar
    

    제공된 gluestd_kafka.zip에는 kafka 전송과 avro 사용에 필요한 기본 라이브러리가 들어 있습니다.
    라이브러리 중 iot-site-schema-1.0.0.jar 는 오픈 소스가 아니라 Interface M/W 에서 제공하는 라이브러리 입니다.
    iot-site-schema-1.0.0.jar 가 업데이트 되면 Interface M/W를 통해서 꼭 업데이트 하시기 바랍니다.

  5. 샘플 프로젝트

    제공된 sample-smp.zip에는 message-service.xml을 실행 시켜 kafka에 데이터를 전송하는 TestGlueService.java 와 PosKafkaProducer를 통해서 java소스 상에서 kafka에 데이터를 전송하는 TestKafkaSend.java가 있습니다.