Java 中调用Kettle 4.4的jar包执行转换文件

今天使用4.4发现跟以前的3.2还是有了些不同,这里给出一个JAVA调用api例子,例子里面包括了:

1、自定义kettle的全局配置文件目录,即KETTLE_HOME,这样就可以自己随意指定配置目录

2、能打印出对应的执行日志,然后report出来。

3、所需要的jar包:

 

jar

下面是源码:

import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.exception.KettleException; 
import org.pentaho.di.core.logging.CentralLogStore;
import org.pentaho.di.core.logging.Log4jBufferAppender;
import org.pentaho.di.core.logging.LogLevel; 
import org.pentaho.di.repository.Repository;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;

/**
 * 
 * @author ahuoo
 *
 */
public class ExecuteTransformationUtil {

	private static Log log = LogFactory.getLog(ExecuteTransformationUtil.class);

	/**
	 * 
	 * @param filename
	 * @param params  you can set your parameters in SQL, such as ${beginDate}
	 * @return
	 * @throws KettleException
	 */
	public Result runTransformation(String filename,Map<String, String> params,boolean debug) throws KettleException { 

		log.info("***************************************************************************************");
		log.info("Attempting to run transformation " + filename	+ " from file system");
		log.info("***************************************************************************************n"); 
		TransMeta transMeta = new TransMeta(filename, (Repository) null);

		if(params!=null){	
			log.info("Attempting to read and set named parameters");
			String[] declaredParameters = transMeta.listParameters();
			for (int i = 0; i < declaredParameters.length; i++) {
				String parameterName = declaredParameters[i]; 
				String description = transMeta.getParameterDescription(parameterName);
				String defaultValue = transMeta.getParameterDefault(parameterName);
				String parameterValue = params.get(parameterName)==null ? defaultValue : params.get(parameterName);

				String output = "Setting parameter " + parameterName + " to ""
						+ parameterValue + "" [description: "" + description
						+ "", default: "" + defaultValue + ""]";
				log.info(output);

				transMeta.setParameterValue(parameterName, parameterValue);
			}
		}
		Trans transformation = new Trans(transMeta); 
		if(debug){
			transformation.setLogLevel(LogLevel.DEBUG); 
		}
		log.info("Starting transformation");
		transformation.execute(new String[0]);
		transformation.waitUntilFinished();
		Result result = transformation.getResult(); 
		log.info("Trans "	+ filename	+ " executed " + (result.getNrErrors() == 0 ? "successfullyn" : "with " + result.getNrErrors() + " errorsn"));

		return result; 		
	}

	/**
	 * @param args
	 * @throws KettleException 
	 */
	public static void main(String[] args) throws KettleException {
		System.getProperties().setProperty("KETTLE_HOME","c:"+ System.getProperty("file.separator") +"home"); 
		KettleEnvironment.init();

		ExecuteTransformationUtil executor = new ExecuteTransformationUtil();
		HashMap<String,String> params = new HashMap<String,String>();
		params.put("statementId", "12");
		Result result = executor.runTransformation("c:/tt.ktr", params,false);

		Log4jBufferAppender appender = CentralLogStore.getAppender();
		String logText = appender.getBuffer(result.getLogChannelId(), false).toString();
		log.info(logText);
	}

}

 

4、转换里面的参数怎么动态设置和使用,见下图: 

-Table Input参数替换 ,设计器右键,点击Transformation setting

1

 

 

 

 

2

 

-Execute SQL statements参数替换,注意红色的标注

executeSql

 

5、转换里面的 Executing SQL script要小心使用,有时候它不是按照你设计的顺序执行的,原以为4.4解决了,发现还是存在这个问题,有些人说在它的后面加Blocking Step就可以解决问题了,不是很放心,还是先看看其源码,大家可以打开4.4的code:

public class Trans implements VariableSpace, NamedParams, HasLogChannelInterface, LoggingObjectInterface
{
  public void startThreads() throws KettleException
    {......
      case Normal:

	        // Now start all the threads...
	    	//
	        for (int i=0;i<steps.size();i++)
	        {
	        	StepMetaDataCombi combi = steps.get(i);
	        	RunThread runThread = new RunThread(combi);
	        	Thread thread = new Thread(runThread);
	        	thread.setName(getName()+" - "+combi.stepname);
	        	thread.start();//此处开始执行线程
	        }
	        break;

        case SerialSingleThreaded://此处是4.4新加的 单线程执行
            new Thread(new Runnable() {
                public void run() {
                    try {

 

public class RunThread implements Runnable {
          public void run() {
		try
		{
			step.setRunning(true);
			if (log.isDetailed()) log.logDetailed(BaseMessages.getString(PKG, "System.Log.StartingToRun")); //$NON-NLS-1$

			while (step.processRow(meta, data) && !step.isStopped());//执行每个step
		}
...

}

通过查看源码发现,Normal情况下其实转换里面是并发执行的。综上所述,要想使transformation的每个step按设计的顺序执行,最好在在transformation配置里面,将Transformation engine type 设置成 Serial Single Threaded (Experimental!),根据字面意思,貌似这个功能还是实验产品,看来后续有可能进一步完善。

 

 

转载自:http://www.ahuoo.com/?p=1467

 

已有5 条评论

  1. 对于kettle本地保存的文件 用Java代码,来调用.kettle文件里一个步骤,运行结果报
    ERROR 14-10 12:36:20,672 – Excel输入 – java.lang.NoClassDefFoundError: org/apache/poi/ss/usermodel/WorkbookFactory
    这里是报啥错? 没有引入jar包?
    对于kettle保存的转换文件,里面的形式不就是xml的保存方式。。。。

  2. java.lang.NoSuchMethodError: org.apache.poi.ss.usermodel.Sheet.getSheetName()Ljava/lang/String;
    把所有包都找齐了, 还是报这个错误。 Sheet 抽象接口有这个方法啊。

  3. 使用中遇到异常,异常信息:database type with plugin id [Oracle] couldn’t be found!,增加初始化语句KettleEnvironment.init();解决,其它都参考以上源码,可以正常使用。
    非常感谢作者分享。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>