Gearman使用范例

Gearman使用范例

Gearman是一个分发任务的程序框架,可以用在各种场合,与Hadoop相比,Gearman更偏向于任务分发功能。它的任务分布非常简单,简单得可以只需要用脚本即可完成。Gearman最初用于LiveJournal的图片resize功能,由于图片resize需要消耗大量计算资源,因此需要调度到后端多台服务器执行,完成任务之后返回前端再呈现到界面。

工程依赖配置

<dependencies>
  <dependency>
    <groupId>org.gearman.jgs</groupId>
    <artifactId>java-gearman-service</artifactId>
    <version>0.7.0-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>net.sf.json-lib</groupId>
    <artifactId>json-lib</artifactId>
    <version>2.4</version>
    <classifier>jdk15</classifier>
  </dependency>
</dependencies>
<repositories>
  <repository>
    <id>aliyun</id>
    <name>aliyun private nexus</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    <releases>
      <enabled>true</enabled>
    </releases>
    <snapshots>
      <enabled>false</enabled>
    </snapshots>
  </repository>
  <repository>
    <id>jfrog</id>
    <name>jfrog private maven library</name>
    <url>https://oss.jfrog.org/libs-snapshot/</url>
    <releases>
      <enabled>false</enabled>
    </releases>
    <snapshots>
      <enabled>true</enabled>
    </snapshots>
  </repository>
</repositories>

Gearman服务端

/*
 * Create a Gearman instance
 */
Gearman gearman = Gearman.createGearman();
try {
    /*
     * Start a new job server. The resulting server will be running in
     * the local address space.
     *
     * Parameter 1: The port number to listen on
     *
     * throws IOException
     */
    GearmanServer server = gearman
            .startGearmanServer(EchoWorker.ECHO_PORT);

    /*
     * Create a gearman worker. The worker poll jobs from the server and
     * executes the corresponding GearmanFunction
     */
} catch (IOException ioe) {
    /*
     * If an exception occurs, make sure the gearman service is shutdown
     */
    gearman.shutdown();
    // forward exception
    throw ioe;
}

这里注意有一个重载的public GearmanServer startGearmanServer(int port, GearmanPersistence persistence),通过它可以将提交的任务持久化,即使Gearman服务端重启,提交的任务还是可以还原的

Gearman客户端与Worker端

Gearman里提交任务有两种方式:非backgroud提交方式、background提交方式。简单来说非backgroud提交方式是一种同步提交方式,客户端提交任务后保持一个长连接,通过这个长连接可以从执行的Function中获得中间任务数据;而background提交方式是一种异步提交方式,客户端提交任务后获得一个jobHandle, 后面都通过jobHandle获取执行Function的任务状态。

非backgroud提交方式

System.out.println(EchoWorker.ECHO_FUNCTION_NAME);
/*
 * Create a Gearman instance
 */
Gearman gearman = Gearman.createGearman();
/*
 * Create a new gearman client.
 *
 * The client is used to submit requests the job server.
 */
GearmanClient client = gearman.createGearmanClient();
/*
 * Create the job server object. This call creates an object represents
 * a remote job server.
 *
 * Parameter 1: the host address of the job server.
 * Parameter 2: the port number the job server is listening on.
 *
 * A job server receives jobs from clients and distributes them to
 * registered workers.
 */
GearmanServer server = gearman.createGearmanServer(
        EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
/*
 * Tell the client that it may connect to this server when submitting
 * jobs.
 */
client.addServer(server);
/*
 * Submit a job to a job server.
 *
 * Parameter 1: the gearman function name
 * Parameter 2: the data passed to the server and worker
 *
 * The GearmanJobReturn is used to poll the job's result
 */
JSONObject json = new JSONObject();
json.put("name", "admin");
GearmanJobReturn jobReturn = client.submitJob(
        EchoWorker.ECHO_FUNCTION_NAME,json.toString().getBytes());
/*
 * Iterate through the job events until we hit the end-of-file
 */
byte[] jobHandle = null;
while (!jobReturn.isEOF()) {

    // Poll the next job event (blocking operation)
    GearmanJobEvent event = jobReturn.poll();

    switch (event.getEventType()) {
        // success
        case GEARMAN_JOB_SUCCESS: // Job completed successfully
            // print the result
            System.out.println(new String(event.getData()));
            break;
        case GEARMAN_SUBMIT_SUCCESS:
            // get job handle
            jobHandle = event.getData();
            break;
        case GEARMAN_JOB_DATA:
            // print job data
            System.out.println(new String(event.getData()));
            break;
        // failure
        case GEARMAN_SUBMIT_FAIL: // The job submit operation failed
        case GEARMAN_JOB_FAIL: // The job's execution failed
            System.err.println(event.getEventType() + ": "
                    + new String(event.getData()));
    }

}
/*
 * Close the gearman service after it's no longer needed. (closes all
 * sub-services, such as the client)
 *
 * It's suggested that you reuse Gearman and GearmanClient instances
 * rather recreating and closing new ones between submissions
 */
gearman.shutdown();

public static void main(String... args) {

    System.out.println(EchoWorker.ECHO_FUNCTION_NAME);
    registWorker();


}

public static void registWorker(){
    /*
     * Create a Gearman instance
     */
    Gearman gearman = Gearman.createGearman();

    /*
     * Create the job server object. This call creates an object represents
     * a remote job server.
     *
     * Parameter 1: the host address of the job server.
     * Parameter 2: the port number the job server is listening on.
     *
     * A job server receives jobs from clients and distributes them to
     * registered workers.
     */
    GearmanServer server = gearman.createGearmanServer(
            EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);

    /*
     * Create a gearman worker. The worker poll jobs from the server and
     * executes the corresponding GearmanFunction
     */
    System.out.println(server.toString());
    GearmanWorker worker = gearman.createGearmanWorker();
    /*
     *  Tell the worker how to perform the echo function
     */
    worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());
    /*
     *  Tell the worker that it may communicate with the this job server
     */
    boolean success = worker.addServer(server);
    System.out.println(success);
}


public byte[] work(String function, byte[] data,
                   GearmanFunctionCallback callback) throws Exception {

    /*
     * The work method performs the gearman function. In this case, the echo
     * function simply returns the data it received
     */

    System.out.println(new String(data));

    System.out.println("begin work");

    callback.sendData("some data".getBytes());

    System.out.println("end work");

    return "job result".getBytes();
}

backgroud提交方式

System.out.println(EchoWorker.ECHO_FUNCTION_NAME);
/*
 * Create a Gearman instance
 */
Gearman gearman = Gearman.createGearman();
/*
 * Create a new gearman client.
 *
 * The client is used to submit requests the job server.
 */
GearmanClient client = gearman.createGearmanClient();
/*
 * Create the job server object. This call creates an object represents
 * a remote job server.
 *
 * Parameter 1: the host address of the job server.
 * Parameter 2: the port number the job server is listening on.
 *
 * A job server receives jobs from clients and distributes them to
 * registered workers.
 */
GearmanServer server = gearman.createGearmanServer(
        EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
/*
 * Tell the client that it may connect to this server when submitting
 * jobs.
 */
client.addServer(server);
/*
 * Submit a job to a job server.
 *
 * Parameter 1: the gearman function name
 * Parameter 2: the data passed to the server and worker
 *
 * The GearmanJobReturn is used to poll the job's result
 */
JSONObject json = new JSONObject();
json.put("name", "admin");
GearmanJobReturn jobReturn = client.submitBackgroundJob(
        EchoWorker.ECHO_FUNCTION_NAME,json.toString().getBytes());
/*
 * Iterate through the job events until we hit the end-of-file
 */
byte[] jobHandle = null;
while (!jobReturn.isEOF()) {
    // Poll the next job event (blocking operation)
    GearmanJobEvent event = jobReturn.poll();
    switch (event.getEventType()) {
        case GEARMAN_SUBMIT_SUCCESS:
            jobHandle = event.getData();
            break;
        // failure
        case GEARMAN_SUBMIT_FAIL: // The job submit operation failed
        case GEARMAN_JOB_FAIL: // The job's execution failed
            System.err.println(event.getEventType() + ": "
                    + new String(event.getData()));
    }
}
for (int i = 0; i < 10; i++) {
    GearmanJobStatus status = client.getStatus(jobHandle);
    System.out.println(String.format("known: %b, running: %b, denominator: %d, numerator: %d", status.isKnown(), status.isRunning(), status.getDenominator(), status.getNumerator()));
    Thread.sleep(2000L);
}
/*
 * Close the gearman service after it's no longer needed. (closes all
 * sub-services, such as the client)
 *
 * It's suggested that you reuse Gearman and GearmanClient instances
 * rather recreating and closing new ones between submissions
 */
gearman.shutdown();

public static void main(String... args) {
    System.out.println(EchoWorker.ECHO_FUNCTION_NAME);
    registWorker();
}
public static void registWorker(){
    /*
     * Create a Gearman instance
     */
    Gearman gearman = Gearman.createGearman();

    /*
     * Create the job server object. This call creates an object represents
     * a remote job server.
     *
     * Parameter 1: the host address of the job server.
     * Parameter 2: the port number the job server is listening on.
     *
     * A job server receives jobs from clients and distributes them to
     * registered workers.
     */
    GearmanServer server = gearman.createGearmanServer(
            EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
    /*
     * Create a gearman worker. The worker poll jobs from the server and
     * executes the corresponding GearmanFunction
     */
    System.out.println(server.toString());
    GearmanWorker worker = gearman.createGearmanWorker();
    /*
     *  Tell the worker how to perform the echo function
     */
    worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());
    /*
     *  Tell the worker that it may communicate with the this job server
     */
    boolean success = worker.addServer(server);
    System.out.println(success);
}
public byte[] work(String function, byte[] data,
                   GearmanFunctionCallback callback) throws Exception {

    /*
     * The work method performs the gearman function. In this case, the echo
     * function simply returns the data it received
     */
    System.out.println(new String(data));
    System.out.println("begin work");
    for (int i = 0; i < 10; i++) {
        callback.sendStatus(i, i);
        Thread.sleep(2000L);
    }
    System.out.println("end work");
    return null;
}

参考

http://www.blogdaren.com/m/?post=1497

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏bboysoul

linux编译安装apache

wget http://mirrors.ustc.edu.cn/apache/httpd/httpd-2.4.25.tar.gz tar -zxvf http...

2463
来自专栏Netkiller

怎样制作RPM包

怎样制作RPM包 摘要 我在网上找RPM包的制作例子几乎都是C源码编译安装然后生成RPM包, 而我的程序不是C写的很多时候是脚本语言如Python, PHP 甚...

6986
来自专栏技术博文

Linux命令英文全称

su:Swith user  切换用户,切换到root用户 cat: Concatenate  串联 uname: Unix name  系统名称 df: Di...

3915
来自专栏蓝天

零停重启程序工具Huptime研究

零停重启目标程序,比如一个网络服务程序,不用丢失和中断任何消息实现重新启动,正在处理的消息也不会中断和丢失,重启的方法是给目标程序的进程发SIGHUP信号。...

851
来自专栏蓝天

Redis模块开发示例

实现一个Redis module,支持两个扩展命令: 1) 可同时对hash的多个field进行incr操作; 2) incrby同时设置一个key的过期时...

1073
来自专栏程序猿

sql 2005 注入语句

[Copy to clipboard]CODE: /**/and/**/(select/**/top/**/1/**/isnull(cast([name]/**...

33710
来自专栏一个会写诗的程序员的博客

Spring Boot 集成 WebFlux 开发 Reactive Web 应用Spring Boot 集成 WebFlux 开发 Reactive Web 应用

IBM的研究称,整个人类文明所获得的全部数据中,有90%是过去两年内产生的。在此背景下,包括NoSQL,Hadoop, Spark, Storm, Kylin在...

1382
来自专栏冷冷

基于Redis实现分布式应用限流

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务。 前几天在DD的公众号,看了一篇关于使用 ...

5778
来自专栏菩提树下的杨过

需要安全认证的远程EJB调用示例(Jboss EAP 6.2环境)

一,Remote EJB 服务接口定义: 1 package yjmyzz.ejb.server.helloworld; 2 3 public interfa...

2425
来自专栏技术总结

献给移动端的服务器搭建

application.properties这个是项目的一些配置,举例一下默认是8080端口,我们如果想改下端口的话,就可以在配置增加

2422

扫码关注云+社区

领取腾讯云代金券