前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Gearman使用范例

Gearman使用范例

作者头像
jeremyxu
发布2018-05-10 18:27:28
1.2K0
发布2018-05-10 18:27:28
举报
文章被收录于专栏:jeremy的技术点滴

Gearman使用范例

Gearman是一个分发任务的程序框架,可以用在各种场合,与Hadoop相比,Gearman更偏向于任务分发功能。它的任务分布非常简单,简单得可以只需要用脚本即可完成。Gearman最初用于LiveJournal的图片resize功能,由于图片resize需要消耗大量计算资源,因此需要调度到后端多台服务器执行,完成任务之后返回前端再呈现到界面。

工程依赖配置

代码语言:javascript
复制
<dependencies>
  <dependency>
    <groupId>org.gearman.jgs</groupId>
    <artifactId>java-gearman-service</artifactId>
    <version>0.7.0-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>net.sf.json-lib</groupId>
    <artifactId>json-lib</artifactId>
    <version>2.4</version>
    <classifier>jdk15</classifier>
  </dependency>
</dependencies>
<repositories>
  <repository>
    <id>aliyun</id>
    <name>aliyun private nexus</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    <releases>
      <enabled>true</enabled>
    </releases>
    <snapshots>
      <enabled>false</enabled>
    </snapshots>
  </repository>
  <repository>
    <id>jfrog</id>
    <name>jfrog private maven library</name>
    <url>https://oss.jfrog.org/libs-snapshot/</url>
    <releases>
      <enabled>false</enabled>
    </releases>
    <snapshots>
      <enabled>true</enabled>
    </snapshots>
  </repository>
</repositories>

Gearman服务端

代码语言:javascript
复制
/*
 * Create a Gearman instance
 */
Gearman gearman = Gearman.createGearman();
try {
    /*
     * Start a new job server. The resulting server will be running in
     * the local address space.
     *
     * Parameter 1: The port number to listen on
     *
     * throws IOException
     */
    GearmanServer server = gearman
            .startGearmanServer(EchoWorker.ECHO_PORT);

    /*
     * Create a gearman worker. The worker poll jobs from the server and
     * executes the corresponding GearmanFunction
     */
} catch (IOException ioe) {
    /*
     * If an exception occurs, make sure the gearman service is shutdown
     */
    gearman.shutdown();
    // forward exception
    throw ioe;
}

这里注意有一个重载的public GearmanServer startGearmanServer(int port, GearmanPersistence persistence),通过它可以将提交的任务持久化,即使Gearman服务端重启,提交的任务还是可以还原的

Gearman客户端与Worker端

Gearman里提交任务有两种方式:非backgroud提交方式、background提交方式。简单来说非backgroud提交方式是一种同步提交方式,客户端提交任务后保持一个长连接,通过这个长连接可以从执行的Function中获得中间任务数据;而background提交方式是一种异步提交方式,客户端提交任务后获得一个jobHandle, 后面都通过jobHandle获取执行Function的任务状态。

非backgroud提交方式

代码语言:javascript
复制
System.out.println(EchoWorker.ECHO_FUNCTION_NAME);
/*
 * Create a Gearman instance
 */
Gearman gearman = Gearman.createGearman();
/*
 * Create a new gearman client.
 *
 * The client is used to submit requests the job server.
 */
GearmanClient client = gearman.createGearmanClient();
/*
 * Create the job server object. This call creates an object represents
 * a remote job server.
 *
 * Parameter 1: the host address of the job server.
 * Parameter 2: the port number the job server is listening on.
 *
 * A job server receives jobs from clients and distributes them to
 * registered workers.
 */
GearmanServer server = gearman.createGearmanServer(
        EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
/*
 * Tell the client that it may connect to this server when submitting
 * jobs.
 */
client.addServer(server);
/*
 * Submit a job to a job server.
 *
 * Parameter 1: the gearman function name
 * Parameter 2: the data passed to the server and worker
 *
 * The GearmanJobReturn is used to poll the job's result
 */
JSONObject json = new JSONObject();
json.put("name", "admin");
GearmanJobReturn jobReturn = client.submitJob(
        EchoWorker.ECHO_FUNCTION_NAME,json.toString().getBytes());
/*
 * Iterate through the job events until we hit the end-of-file
 */
byte[] jobHandle = null;
while (!jobReturn.isEOF()) {

    // Poll the next job event (blocking operation)
    GearmanJobEvent event = jobReturn.poll();

    switch (event.getEventType()) {
        // success
        case GEARMAN_JOB_SUCCESS: // Job completed successfully
            // print the result
            System.out.println(new String(event.getData()));
            break;
        case GEARMAN_SUBMIT_SUCCESS:
            // get job handle
            jobHandle = event.getData();
            break;
        case GEARMAN_JOB_DATA:
            // print job data
            System.out.println(new String(event.getData()));
            break;
        // failure
        case GEARMAN_SUBMIT_FAIL: // The job submit operation failed
        case GEARMAN_JOB_FAIL: // The job's execution failed
            System.err.println(event.getEventType() + ": "
                    + new String(event.getData()));
    }

}
/*
 * Close the gearman service after it's no longer needed. (closes all
 * sub-services, such as the client)
 *
 * It's suggested that you reuse Gearman and GearmanClient instances
 * rather recreating and closing new ones between submissions
 */
gearman.shutdown();

代码语言:javascript
复制
public static void main(String... args) {

    System.out.println(EchoWorker.ECHO_FUNCTION_NAME);
    registWorker();


}

public static void registWorker(){
    /*
     * Create a Gearman instance
     */
    Gearman gearman = Gearman.createGearman();

    /*
     * Create the job server object. This call creates an object represents
     * a remote job server.
     *
     * Parameter 1: the host address of the job server.
     * Parameter 2: the port number the job server is listening on.
     *
     * A job server receives jobs from clients and distributes them to
     * registered workers.
     */
    GearmanServer server = gearman.createGearmanServer(
            EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);

    /*
     * Create a gearman worker. The worker poll jobs from the server and
     * executes the corresponding GearmanFunction
     */
    System.out.println(server.toString());
    GearmanWorker worker = gearman.createGearmanWorker();
    /*
     *  Tell the worker how to perform the echo function
     */
    worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());
    /*
     *  Tell the worker that it may communicate with the this job server
     */
    boolean success = worker.addServer(server);
    System.out.println(success);
}


public byte[] work(String function, byte[] data,
                   GearmanFunctionCallback callback) throws Exception {

    /*
     * The work method performs the gearman function. In this case, the echo
     * function simply returns the data it received
     */

    System.out.println(new String(data));

    System.out.println("begin work");

    callback.sendData("some data".getBytes());

    System.out.println("end work");

    return "job result".getBytes();
}

backgroud提交方式

代码语言:javascript
复制
System.out.println(EchoWorker.ECHO_FUNCTION_NAME);
/*
 * Create a Gearman instance
 */
Gearman gearman = Gearman.createGearman();
/*
 * Create a new gearman client.
 *
 * The client is used to submit requests the job server.
 */
GearmanClient client = gearman.createGearmanClient();
/*
 * Create the job server object. This call creates an object represents
 * a remote job server.
 *
 * Parameter 1: the host address of the job server.
 * Parameter 2: the port number the job server is listening on.
 *
 * A job server receives jobs from clients and distributes them to
 * registered workers.
 */
GearmanServer server = gearman.createGearmanServer(
        EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
/*
 * Tell the client that it may connect to this server when submitting
 * jobs.
 */
client.addServer(server);
/*
 * Submit a job to a job server.
 *
 * Parameter 1: the gearman function name
 * Parameter 2: the data passed to the server and worker
 *
 * The GearmanJobReturn is used to poll the job's result
 */
JSONObject json = new JSONObject();
json.put("name", "admin");
GearmanJobReturn jobReturn = client.submitBackgroundJob(
        EchoWorker.ECHO_FUNCTION_NAME,json.toString().getBytes());
/*
 * Iterate through the job events until we hit the end-of-file
 */
byte[] jobHandle = null;
while (!jobReturn.isEOF()) {
    // Poll the next job event (blocking operation)
    GearmanJobEvent event = jobReturn.poll();
    switch (event.getEventType()) {
        case GEARMAN_SUBMIT_SUCCESS:
            jobHandle = event.getData();
            break;
        // failure
        case GEARMAN_SUBMIT_FAIL: // The job submit operation failed
        case GEARMAN_JOB_FAIL: // The job's execution failed
            System.err.println(event.getEventType() + ": "
                    + new String(event.getData()));
    }
}
for (int i = 0; i < 10; i++) {
    GearmanJobStatus status = client.getStatus(jobHandle);
    System.out.println(String.format("known: %b, running: %b, denominator: %d, numerator: %d", status.isKnown(), status.isRunning(), status.getDenominator(), status.getNumerator()));
    Thread.sleep(2000L);
}
/*
 * Close the gearman service after it's no longer needed. (closes all
 * sub-services, such as the client)
 *
 * It's suggested that you reuse Gearman and GearmanClient instances
 * rather recreating and closing new ones between submissions
 */
gearman.shutdown();

代码语言:javascript
复制
public static void main(String... args) {
    System.out.println(EchoWorker.ECHO_FUNCTION_NAME);
    registWorker();
}
public static void registWorker(){
    /*
     * Create a Gearman instance
     */
    Gearman gearman = Gearman.createGearman();

    /*
     * Create the job server object. This call creates an object represents
     * a remote job server.
     *
     * Parameter 1: the host address of the job server.
     * Parameter 2: the port number the job server is listening on.
     *
     * A job server receives jobs from clients and distributes them to
     * registered workers.
     */
    GearmanServer server = gearman.createGearmanServer(
            EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
    /*
     * Create a gearman worker. The worker poll jobs from the server and
     * executes the corresponding GearmanFunction
     */
    System.out.println(server.toString());
    GearmanWorker worker = gearman.createGearmanWorker();
    /*
     *  Tell the worker how to perform the echo function
     */
    worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());
    /*
     *  Tell the worker that it may communicate with the this job server
     */
    boolean success = worker.addServer(server);
    System.out.println(success);
}
public byte[] work(String function, byte[] data,
                   GearmanFunctionCallback callback) throws Exception {

    /*
     * The work method performs the gearman function. In this case, the echo
     * function simply returns the data it received
     */
    System.out.println(new String(data));
    System.out.println("begin work");
    for (int i = 0; i < 10; i++) {
        callback.sendStatus(i, i);
        Thread.sleep(2000L);
    }
    System.out.println("end work");
    return null;
}

参考

http://www.blogdaren.com/m/?post=1497

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-09-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Gearman使用范例
    • 工程依赖配置
      • Gearman服务端
        • Gearman客户端与Worker端
          • 非backgroud提交方式
          • backgroud提交方式
        • 参考
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档