Trigger Cloud Composer DAGs With Java

Liferay Analytics Cloud heavily relies on Apache Airflow for orchestrating its backend operations. While most of these tasks are scheduled, one recent requirement involved triggering a Directed Acyclic Graph (DAG) in response to a specific event within a Java process.

The most direct method to trigger an event-based job is through the direct invocation of Apache Airflow's REST APIs.

Since there's not much documentation available on how to accomplish this task it in Java. I thought I could share some snippets in case someone faces the same task.

It's also worth to mention that we operate Apache Airflow via Cloud Composer. The classes dependencies bellow are from Google's HTTP Java Client and Google Auth Library.

If your container holds the Google Credentials environment variables, the following code will manage the request authentication for you:

GoogleCredentials credentials =
    GoogleCredentials.getApplicationDefault();

credentials = credentials.createScoped(
    "https://www.googleapis.com/auth/cloud-platform");

NetHttpTransport netHttpTransport = new NetHttpTransport();

HttpRequestFactory requestFactory =
    netHttpTransport.createRequestFactory(
       new HttpCredentialsAdapter(credentials));

HttpRequest httpRequest = requestFactory.buildPostRequest(
    new GenericUrl(
       _composerEndpoint + "/api/v1/dags/" + dagId + "/dagRuns"),
    ByteArrayContent.fromString(
       "application/json",
       JSONUtil.put(
          "logical_date", DateUtil.newDateString()
       ).toString()));

HttpHeaders httpHeaders = httpRequest.getHeaders();

httpHeaders.setContentType("application/json");

HttpResponse httpResponse = httpRequest.execute();

if (httpResponse.getStatusCode() != 200) {
    _log.error(
       String.format(
          "Unexpected error after triggering DAG %s " +
             "Status code: %s",
          dagId, httpResponse.getStatusCode()));
}

Hope this helps anyone out there!

Regards