Generate & Submit Task
The most efficient way to use Parallec is by reviewing the example codes.
APIs on Request Generation
First lets check on the ones that are protocol independent.
- setTargetHostsFrom*() is critical to set the target hosts, you may check the details on the following section
- setConcurrency() is important when you want to change how fast/slow to send the requests.
Details on configs please check here.
API | Required | Default If Not Set | Details |
---|---|---|---|
setTargetHostsFrom*() | Required | No default. Must be set. | To set target host list from a string, list, line by lien text, jsonpath, and cms query from local or remote. |
setProtocol() | Optional | HTTP | Normally we do not need to set this. When you do prepare*() it is already set. When it is HTTPS. you will need to set it |
setConcurrency() | Optional | concurrencyDefault (1000) | The concurrency level. You may send 100,000 requests but send it very slowly. |
setConfig() | Optional | default values | Configs about various timeout, whether to auto save responses. whether to enable the response. This can set all the task level over writable configurations. |
setEnableCapacityAware TaskScheduler() | Optional | false: not enabled | After enabled, can accommodate the traffic. |
async() | Optional | false, default to sync mode. | To run the parallel task async . You may check if the task is completed later. |
Capacity aware scheduler
When enabled, task will be pushed to the wait queue instead of immediate execution. A daemon thread (by newSingleThreadScheduledExecutor) will check every 0.5 second if there is capacity to run a task from the wait queue.
This is useful to protect our application when there are multiple concurrent ParallelTasks, and each has high currency requirement.
By default this scheduler is disabled. This is unnecessary unless there are high load or tasks are submitted without our control (e.g. serve as a server).
APIs on Response Handling
API | Required | Default If Not Set | Details |
---|---|---|---|
handleInWorker() handleInManager() | Optional | HandleInManager() | Please check the Response Handler Location below for details. |
setResponseContext() | Optional | An empty hashmap | Useful when need to pass arbitrary objects from/to the response handler: e.g. pass in an Elastic Search or Kafka Client, or a hashmap to store / process / aggregate the responses. |
execute() | Required | n/a | Key function to execute the parallel task. It will first do a validation on required data before the execution. |
setAutoSaveLogToLocal() | Optional | False | Will auto save logs to the local file system. The logs by default are written to path "userdata/task/logs" folder. Note that it is user's responsibility to clearn these logs. |
setSaveResponseToTask() | Optional | False | If true, will save response to the ParallelTask object. In default, only status code is saved. |
Response Handler Location
This is about when to call the user defined response handler's onComplete() function.
- Handler in manager (default): Call response handler in manager (in a sequence after aggregation) Default mode. In this mode, will trigger the user defined response hander after response is passed back from worker to manager. This is the default mode. Be cautious on using long blocking operation in the handler onComplete() function. Because a long operation may block the whole flow because each response will need to go through here.
- Handler in worker: Call response handler in operation worker (in parallel before aggregation). Handle the user defined onComplete() function in worker before aggregation (handle in parallel). Be cautious on concurrency / lock control if save the response to a common data store. Also when you define the concurrency level, take into account of the time needed to hander the response.
Set Target Hosts
Parallec provide flexible way to input multiple target hosts from list, string, line by line text, json path, cms query from local or remote urls.
Check the following .setTargetHosts*() functions to set the target hosts.
From Java List
.setTargetHostsFromList(Arrays.asList("www.jeffpei.com", "www.restcommander.com"));
From Java String
From a single string as separate by whitespace.
.setTargetHostsFromString("www.jeffpei.com www.restcommander.com");
From Line by Line Text
From a local file containing host names line by line. Relative or absolute paths are both supported.
.setTargetHostsFromLineByLineText("userdata/sample_target_hosts_top100_old.txt",
HostsSourceType.LOCAL_FILE)
Also, you can set target hosts from such a file from a web URL. (This will use the apache server)
.setTargetHostsFromLineByLineText("http://www.restcommander.com/docs/sample_target_hosts_top100.txt",
HostsSourceType.URL)
From Json Path
JsonPath is useful to extract host name list from a json file
Here is a sample json file that contains host names.
As long as seperate by whitespace
String jsonPath = "$.sample.small-target-hosts[*].hostName";
.setTargetHostsFromJsonPath(jsonPath,
"http://parallec.github.io/userdata/sample_target_hosts_json_path.json", HostsSourceType.URL);
You may also load such jsons from local file system too.
From YiDB/CMS Query
YiDB a.k.a CMS (Configuration Manage System internally) may store the cloud topology information.
Here is a sample CMS Query Result that contains host names.
Parallec will auto load target hosts from CMS query and handles paginations for you.
public final String URL_CMS_QUERY_MULTI_PAGE = "http://parallec.github.io/cms/repositories/cmsdb/branches/main/query/sample_cms_query_results_multi_page_1.json";
.setTargetHostsFromCmsQueryUrl(URL_CMS_QUERY_MULTI_PAGE);
Timeout in URLs
ParallecGlobalConfig defines the timeout using the remote URL to get target hosts. One may change it before usage.
/** The url connection connect timeout millis. Used when load target host from URL/CMS*/
public static int urlConnectionConnectTimeoutMillis = 6000;
/** The url connection read timeout millis. Used when load target host from URL/CMS*/
public static int urlConnectionReadTimeoutMillis = 15000;
APIs on HTTP
Most of the APIs to set HTTP properties are named with setHttp*(), except for the setAsyncHttpClient() which overwrite the asyncHttpClient used.
Set HTTP Method, URL and Protocol
- prepareHttp*() will set the url and HTTP Method. e.g., prepareHttpGet("/index.html") means to conduct HTTP GET http://[targethost]:/index.html
- When need to do HTTPS, will do setProtocol(RequestProtocol.HTTPS)
Set HTTP Header
Here is the sample code to set HTTP Header with help of ParallecHeader.
.setHttpHeaders(new ParallecHeader().addPair("content-type", "application/json").addPair("key", "value"))
API | Required | Default If Not Set | Details |
---|---|---|---|
prepareHttp*() | Required | n/a | This is the starting point. Will will set the HttpMethod (e.g. GET/POST/PUT/DELETE) and the url. |
setHttpHeaders() | Optional | empty | Add headers using ParallecHeader. Check example above. |
setHttpEntityBody() | Optional | empty | For example, a POST body for the request. |
setHttpPort() | Optional | port 80 | Set HTTP port. |
setHttpPoller Processor() | Optional | empty | Sets the HTTP poller processor to handle Async API, will auto enable the pollable mode with this call. Details check here. |
setAsyncHttpClient() | Optional | Embed fast one from the store | You may overwrite the client to your customized one for each task. The default one is the embed fast one from HttpClientStore. |
saveResponseHeaders() | Optional | false | Save HTTP response headers. Please check ResponseHeaderMeta and the example. |
APIs on SSH
The APIs to set SSH properties are named with setSsh*().
API | Required | Default If Not Set | Details |
---|---|---|---|
prepareSsh() | Required | n/a | Starting point of ssh. Set protocol as SSH. |
setSsh CommandLine() | Required | n/a | The command flow you would like to execute. |
setSshPort() | Optional | 22 | The SSH Port. |
setSsh UserName() | Required | n/a | User name when login |
setSsh LoginType() | Optional | empty | The login is either key or password based. |
setSshPassword() | Optional | n/a | The ssh login password. Will also auto set the login type to password |
setSsh Connection TimeoutMillis() | Optional | 5000 millisec | Connection timeout. Default to 5000 millisec in global config. |
setSshPrivKey RelativePath() | Optional | n/a | Note that this path must be relative to the project e.g. "userdata/fake-privkey.txt". This API assumes no passphrase for the private key. Will also auto set the login type to key based. |
setSshPrivKey RelativePath WtihPassphrase() | Optional | n/a | Note that this path must be relative to the project. argument include a private key path with passphrase. Will also auto set the login type to key based. |
APIs on PING
The APIs to set Ping properties are named with setPing*().
Details of the two modes of implementations can be found in PingProvider.java
API | Required | Default If Not Set | Details |
---|---|---|---|
preparePing() | Required | n/a | Starting point of ssh. Set protocol as "Ping". |
setPingMode() | Optional | InetAddress | Process or InetAddress based. Default as InetAddress mode. InetAddress requires Root privilege. |
setPing TimeoutMillis() | Optional | 500 | The timeout in milliseconds. |
setPing NumRetries() | Optional | 1 | The number of retries. |
APIs on TCP
The APIs to set TCP properties are named with setTcp*().
API | Required | Default If Not Set | Details |
---|---|---|---|
prepareTcp() | Required | n/a | Starting point of TCP request. Set protocol as "TCP" and the request string. |
setTcpPort() | Required | n/a | A port number server listens on. |
setTcp Connect TimeoutMillis() | Optional | use default 2000 | The connection timeout in milliseconds. |
setTcpIdle TimeoutSec() | Optional | use default 5 | The idle timeout for the channel to close the connection. |
setTcp ChannelFactory() | Optional | use the default one | If not set, will use the default one in TcpSshPingResourceStore. |
APIs on UDP
The APIs to set UDP properties are named with setUdp*().
API | Required | Default If Not Set | Details |
---|---|---|---|
prepareUdp() | Required | n/a | Starting point of UDP request. Set protocol as "UDP" and the request string. |
setUdpPort() | Required | n/a | A port number server listens on. |
setUdp IdleTimeoutSec() | Optional | use default 2 | The idle timeout for the channel to close the connection. Similar to a read timeout. |
APIs on Variable Replacement for Heterogeneous Requests
When the protocol is HTTP, the request's entity body, request URL, and also the header part can be putting as a template with variable denoted with "$VariableName".
More complex replacement samples are available in the test cases.
Different requests to different target hosts
Here is the example of hitting 3 different APIs on 3 different servers. $JOB_ID is the variable being replaced. The API to use is setReplacementVarMapNodeSpecific(). Complete sample code is here.
- http://www.parallec.io/job_a.html
- http://www.jeffpei.com/job_b.html
- http://www.restsuperman.com/job_c.html
Map<String, StrStrMap> replacementVarMapNodeSpecific = new HashMap<String, StrStrMap>();
replacementVarMapNodeSpecific.put("www.parallec.io",
new StrStrMap().addPair("JOB_ID", "job_a"));
replacementVarMapNodeSpecific.put("www.jeffpei.com",
new StrStrMap().addPair("JOB_ID", "job_b"));
replacementVarMapNodeSpecific.put("www.restcommander.com",
new StrStrMap().addPair("JOB_ID", "job_c"));
pc.prepareHttpGet("/$JOB_ID.html")
.setTargetHostsFromString(
"www.parallec.io www.jeffpei.com www.restcommander.com")
.setReplacementVarMapNodeSpecific(replacementVarMapNodeSpecific)
.execute(new ParallecResponseHandler() {...}...
Different requests to the same target host
Here is the example of hitting 2 different APIs to the same target host. $ZIP is the variable being replaced. setReplaceVarMapToSingleTargetSingleVar(String variable, List
- http://www.parallec.io/userdata/sample_weather_48824.txt
- http://www.parallec.io/userdata/sample_weather_95037.txt
pc.prepareHttpGet("/userdata/sample_weather_$ZIP.txt")
.setReplaceVarMapToSingleTargetSingleVar("ZIP",
Arrays.asList("95037","48824"), "www.parallec.io")
.setResponseContext(responseContext)
.execute(new ParallecResponseHandler() {...}...
Regular Expression Response Filter
When defining the response handler, we provide a very simple regular expression based filter class FilterRegex to extract strings.
For example: with ResponseOnSingleTask res, we can apply parse on the response body.
String extractedString = new FilterRegex(
".*<td>JobProgress</td>\\s*<td>(.*?)</td>.*")
.filter(res.getResponseContent());
Async APIs and Auto Progress Polling
Motivation
In many RESTful services today, a job such as "create compute", "download package" may take indefinite amount of time. And these APIs are normally designed to be asynchronus. They immediately return a Job ID, by which you can poll for the job progress and check status. To achieve job level concurrency, it is essential to define a poller, which describes how to poll progress and when to stop as below. Here are the attributes in a poller. Please refer to the javadoc for more details.
- The regex to get jobId
- The regex to get the progress
- The progress polling API (a template with the jobId)
- The job completion regex
- The job failure regex
- The polling interval
Sample Server
Please check this example for complete code. If we have a job submission API that return a job ID, as in this sample server
//submit job
HTTP POST: /submitJob return: {"status": "/status/01218499-a5fe-47cf-a0a8-8e9b106c5219", "progress": 0}
//poll progress
HTTP GET: /status/{JobID}
Sample Poller
// Initialize the poller
String pollerType = "CronusAgentPoller";
String successRegex = ".*\"progress\"\\s*:\\s*(100).*}";
String failureRegex = ".*\"error\"\\s*:\\s*(.*).*}";
String jobIdRegex = ".*\"/status/(.*?)\".*";
String progressRegex = ".*\"progress\"\\s*:\\s*([0-9]*).*}";
int progressStuckTimeoutSeconds = 600;
int maxPollError = 5;
long pollIntervalMillis = 2000L;
String jobIdPlaceHolder = "$JOB_ID";
String pollerRequestTemplate = "/status/" + jobIdPlaceHolder;
HttpPollerProcessor httpPollerProcessor = new HttpPollerProcessor(
pollerType, successRegex, failureRegex, jobIdRegex,
progressRegex, progressStuckTimeoutSeconds, pollIntervalMillis,
pollerRequestTemplate, jobIdPlaceHolder, maxPollError);
To enable the poller defined above, simply call .setHttpPollerProcessor(httpPollerProcessor) when building the task. Parallec will then automatically poll the task progress until it is successful or failure for you, and enable the job level concurrency.