diff --git a/README.md b/README.md index bc12f66..ab80bae 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Keploy Enterprise drives dynamic dedup per testcase. 2. The Java agent resets JaCoCo coverage counters for that testcase. 3. Enterprise replays the testcase. 4. Enterprise sends `END /` on `/tmp/coverage_control.sock`. -5. The Java agent dumps JaCoCo execution data, resolves executed Java lines, and sends them as JSON on `/tmp/coverage_data.sock`. +5. The Java agent dumps JaCoCo execution data and sends the executed probe indices per class (`{className -> [probeIdx]}`) as JSON on `/tmp/coverage_data.sock`. Probes capture branch-level coverage (which branch a test took), so they distinguish tests that run the same lines but take different branches. 6. Enterprise writes the result to `dedupData.yaml` and uses it to identify duplicates. Coverage is collected at per-testcase granularity, not process granularity. diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporter.java b/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporter.java new file mode 100644 index 0000000..c0db8ed --- /dev/null +++ b/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporter.java @@ -0,0 +1,226 @@ +package io.keploy.dedup; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.jacoco.core.analysis.Analyzer; +import org.jacoco.core.analysis.CoverageBuilder; +import org.jacoco.core.analysis.IClassCoverage; +import org.jacoco.core.analysis.ICounter; +import org.jacoco.core.data.ExecutionData; +import org.jacoco.core.data.ExecutionDataStore; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; + +/** + * Offline, one-shot line + branch coverage computation for the dynamic-dedup + * "Option B" flow. Runs OUTSIDE the app JVM (as a short-lived job invoked by + * k8s-proxy at recording stop), so it needs no live JaCoCo agent — only: + * + * + * + *

Reconstruction: for each class in the union we build an + * {@link ExecutionData} with the manifest's id + a {@code boolean[probeCount]} + * whose fired indices are set, put it in an {@link ExecutionDataStore}, then run + * the JaCoCo {@link Analyzer} over the classes. The analyzer computes each + * class's CRC64 id from its bytecode and matches it against the store; a class + * with no entry (never hit) is analyzed with all-missed probes so it still + * contributes to the DENOMINATOR. Summing {@code getLineCounter()} / + * {@code getBranchCounter()} across all classes yields the whole test set's + * line + branch coverage. + * + *

Because dedup is coverage-preserving (it only drops exact/subset + * duplicates), the union of the KEPT fingerprints equals the union of every + * replayed test, so this number is the true whole-test-set coverage. + * + *

Output ({@code --out}) is a JSON object: + * {@code {"lineCovered":N,"lineTotal":N,"branchCovered":N,"branchTotal":N, + * "instructionCovered":N,"instructionTotal":N,"methodCovered":N, + * "methodTotal":N,"classCount":N,"hitClassCount":N}}. + */ +public final class CoverageReporter { + + private static final Gson GSON = new Gson(); + + private CoverageReporter() { + } + + /** JaCoCo class id + total probe count for one class (build-constant). */ + static final class ClassMeta { + String id; // CRC64 class id, unsigned-hex (from ExecutionData.getId()) + int probeCount; // ExecutionData.getProbes().length + } + + /** Computed coverage counters, serialized to the --out file. */ + static final class CoverageResult { + long lineCovered, lineTotal; + long branchCovered, branchTotal; + long instructionCovered, instructionTotal; + long methodCovered, methodTotal; + int classCount; // classes analyzed (denominator basis) + int hitClassCount; // classes with at least one fired probe + } + + public static void main(String[] args) { + try { + Args parsed = Args.parse(args); + CoverageResult result = compute(parsed.classesPath, parsed.unionPath, parsed.manifestPath); + String json = GSON.toJson(result); + if (parsed.outPath != null) { + Files.write(parsed.outPath, json.getBytes(StandardCharsets.UTF_8)); + } else { + System.out.println(json); + } + } catch (Exception e) { + System.err.println("CoverageReporter failed: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } + } + + /** + * Computes coverage from a classes dir/jar, a fired-probe union, and the + * per-class id/probe-count manifest. Package-visible for unit testing. + */ + static CoverageResult compute(Path classesPath, Path unionPath, Path manifestPath) throws IOException { + Map> union = readUnion(unionPath); + Map manifest = readManifest(manifestPath); + + ExecutionDataStore store = new ExecutionDataStore(); + int hitClasses = 0; + for (Map.Entry entry : manifest.entrySet()) { + String className = entry.getKey(); + ClassMeta meta = entry.getValue(); + if (meta == null || meta.id == null || meta.probeCount < 0) { + continue; + } + boolean[] probes = new boolean[meta.probeCount]; + List fired = union.get(className); + if (fired != null) { + for (Integer idx : fired) { + if (idx != null && idx >= 0 && idx < probes.length) { + probes[idx] = true; + } + } + if (!fired.isEmpty()) { + hitClasses++; + } + } + long id = Long.parseUnsignedLong(meta.id, 16); + store.put(new ExecutionData(id, className, probes)); + } + + CoverageBuilder builder = new CoverageBuilder(); + Analyzer analyzer = new Analyzer(store, builder); + File classesFile = classesPath.toFile(); + if (!classesFile.exists()) { + throw new IOException("classes path does not exist: " + classesPath); + } + // analyzeAll walks a dir tree (or a jar) and analyzes every .class, + // computing each class's CRC64 id and matching it against the store. + analyzer.analyzeAll(classesFile); + + CoverageResult result = new CoverageResult(); + int classCount = 0; + for (IClassCoverage cc : builder.getClasses()) { + classCount++; + result.lineCovered += cc.getLineCounter().getCoveredCount(); + result.lineTotal += cc.getLineCounter().getTotalCount(); + result.branchCovered += cc.getBranchCounter().getCoveredCount(); + result.branchTotal += cc.getBranchCounter().getTotalCount(); + result.instructionCovered += cc.getInstructionCounter().getCoveredCount(); + result.instructionTotal += cc.getInstructionCounter().getTotalCount(); + result.methodCovered += cc.getMethodCounter().getCoveredCount(); + result.methodTotal += cc.getMethodCounter().getTotalCount(); + } + result.classCount = classCount; + result.hitClassCount = hitClasses; + return result; + } + + private static Map> readUnion(Path path) throws IOException { + Type type = new TypeToken>>() { + }.getType(); + Map> union = GSON.fromJson(readString(path), type); + return union == null ? java.util.Collections.>emptyMap() : union; + } + + private static Map readManifest(Path path) throws IOException { + Type type = new TypeToken>() { + }.getType(); + Map manifest = GSON.fromJson(readString(path), type); + if (manifest == null) { + throw new IOException("manifest is empty or invalid: " + path); + } + return manifest; + } + + private static String readString(Path path) throws IOException { + return new String(Files.readAllBytes(path), StandardCharsets.UTF_8); + } + + /** Minimal --flag value arg parser. */ + private static final class Args { + Path classesPath; + Path unionPath; + Path manifestPath; + Path outPath; + + static Args parse(String[] argv) { + Args a = new Args(); + for (int i = 0; i + 1 < argv.length; i += 2) { + String flag = argv[i]; + String val = argv[i + 1]; + switch (flag) { + case "--classes": + a.classesPath = Paths.get(val); + break; + case "--union": + a.unionPath = Paths.get(val); + break; + case "--manifest": + a.manifestPath = Paths.get(val); + break; + case "--out": + a.outPath = Paths.get(val); + break; + default: + throw new IllegalArgumentException("unknown flag: " + flag); + } + } + if (a.classesPath == null || a.unionPath == null || a.manifestPath == null) { + throw new IllegalArgumentException( + "usage: CoverageReporter --classes --union " + + "--manifest [--out ]"); + } + return a; + } + } + + // Referenced to keep the ICounter import meaningful for readers scanning + // deps; getLineCounter()/getBranchCounter() return ICounter instances. + @SuppressWarnings("unused") + private static long coveredOf(ICounter counter) { + return counter.getCoveredCount(); + } +} diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporterSelfTest.java b/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporterSelfTest.java new file mode 100644 index 0000000..7f51d19 --- /dev/null +++ b/keploy-sdk/src/main/java/io/keploy/dedup/CoverageReporterSelfTest.java @@ -0,0 +1,120 @@ +package io.keploy.dedup; + +import org.jacoco.core.analysis.Analyzer; +import org.jacoco.core.analysis.CoverageBuilder; +import org.jacoco.core.analysis.IClassCoverage; +import org.jacoco.core.data.ExecutionData; +import org.jacoco.core.data.ExecutionDataStore; +import org.jacoco.core.tools.ExecFileLoader; + +import java.io.File; + +/** + * Standalone validation for {@link CoverageReporter}: proves that computing + * coverage by reconstructing {@link ExecutionData} from a fired-probe + * union + a per-class (id, probeCount) manifest produces the identical + * line/branch numbers as JaCoCo's own analysis of a real {@code .exec} dump. + * + *

Given a real {@code .exec} (produced by running the app under + * {@code -javaagent:jacocoagent.jar}) and the matching classes dir, it: + *

    + *
  1. computes GROUND TRUTH: {@code Analyzer} over the loaded exec store;
  2. + *
  3. derives a manifest (id hex + probeCount) and union (fired indices) from + * that same exec — exactly what the SDK would persist;
  4. + *
  5. REBUILDS an execution store from manifest + union (the + * {@code CoverageReporter} path) and re-analyzes;
  6. + *
  7. asserts the two coverage results are equal.
  8. + *
+ * + *

Usage: {@code java -cp keploy-sdk.jar + * io.keploy.dedup.CoverageReporterSelfTest --exec --classes

} + */ +public final class CoverageReporterSelfTest { + + private CoverageReporterSelfTest() { + } + + public static void main(String[] args) throws Exception { + String execPath = null; + String classesPath = null; + for (int i = 0; i + 1 < args.length; i += 2) { + if ("--exec".equals(args[i])) { + execPath = args[i + 1]; + } else if ("--classes".equals(args[i])) { + classesPath = args[i + 1]; + } + } + if (execPath == null || classesPath == null) { + System.err.println("usage: CoverageReporterSelfTest --exec --classes "); + System.exit(2); + } + + ExecFileLoader loader = new ExecFileLoader(); + loader.load(new File(execPath)); + ExecutionDataStore realStore = loader.getExecutionDataStore(); + + // (1) GROUND TRUTH — analyze the real exec directly. + long[] truth = analyze(realStore, classesPath); + + // (2) Derive manifest + union from the real exec (what the SDK captures), + // then (3) REBUILD a fresh store from them (the CoverageReporter path). + ExecutionDataStore rebuilt = new ExecutionDataStore(); + int classes = 0; + int firedProbes = 0; + for (ExecutionData data : realStore.getContents()) { + classes++; + boolean[] src = data.getProbes(); + boolean[] copy = new boolean[src.length]; // manifest.probeCount = src.length + for (int i = 0; i < src.length; i++) { + if (src[i]) { // union = fired indices + copy[i] = true; + firedProbes++; + } + } + // manifest.id = data.getId() (round-tripped through unsigned-hex, like the real pipeline) + long id = Long.parseUnsignedLong(Long.toHexString(data.getId()), 16); + rebuilt.put(new ExecutionData(id, data.getName(), copy)); + } + long[] rebuiltCov = analyze(rebuilt, classesPath); + + System.out.println("classes in exec: " + classes + ", fired probes: " + firedProbes); + printRow("GROUND TRUTH (direct exec)", truth); + printRow("REBUILT (manifest+union) ", rebuiltCov); + + boolean equal = true; + for (int i = 0; i < truth.length; i++) { + if (truth[i] != rebuiltCov[i]) { + equal = false; + break; + } + } + if (equal) { + System.out.println("RESULT: PASS — reconstruction matches JaCoCo's direct analysis exactly."); + System.exit(0); + } else { + System.out.println("RESULT: FAIL — reconstruction diverges from ground truth."); + System.exit(1); + } + } + + /** Returns {lineCov, lineTot, branchCov, branchTot, instrCov, instrTot}. */ + private static long[] analyze(ExecutionDataStore store, String classesPath) throws Exception { + CoverageBuilder builder = new CoverageBuilder(); + new Analyzer(store, builder).analyzeAll(new File(classesPath)); + long lc = 0, lt = 0, bc = 0, bt = 0, ic = 0, it = 0; + for (IClassCoverage cc : builder.getClasses()) { + lc += cc.getLineCounter().getCoveredCount(); + lt += cc.getLineCounter().getTotalCount(); + bc += cc.getBranchCounter().getCoveredCount(); + bt += cc.getBranchCounter().getTotalCount(); + ic += cc.getInstructionCounter().getCoveredCount(); + it += cc.getInstructionCounter().getTotalCount(); + } + return new long[]{lc, lt, bc, bt, ic, it}; + } + + private static void printRow(String label, long[] c) { + System.out.printf("%s lines %d/%d branches %d/%d instr %d/%d%n", + label, c[0], c[1], c[2], c[3], c[4], c[5]); + } +} diff --git a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java index 30cd22f..2d24b5c 100644 --- a/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java +++ b/keploy-sdk/src/main/java/io/keploy/dedup/KeployDedupAgent.java @@ -1,10 +1,6 @@ package io.keploy.dedup; import com.google.gson.Gson; -import org.jacoco.core.analysis.Analyzer; -import org.jacoco.core.analysis.CoverageBuilder; -import org.jacoco.core.analysis.IClassCoverage; -import org.jacoco.core.analysis.ICounter; import org.jacoco.core.data.ExecutionData; import org.jacoco.core.data.ExecutionDataStore; import org.jacoco.core.data.ExecutionDataWriter; @@ -27,9 +23,12 @@ import java.lang.instrument.Instrumentation; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.net.HttpURLConnection; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.URL; +import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -46,13 +45,16 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.jar.JarEntry; import java.util.jar.JarFile; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; import java.util.stream.Stream; /** - * Collects per-testcase JaCoCo coverage and streams executed lines back to Keploy Enterprise. + * Collects per-testcase JaCoCo coverage and streams the executed probe set per + * class ({className -> [probeIdx]}) back to Keploy Enterprise. */ public final class KeployDedupAgent { @@ -68,7 +70,9 @@ public final class KeployDedupAgent { private static final AtomicBoolean STARTED = new AtomicBoolean(false); private static final AtomicBoolean SHUTDOWN_HOOK_REGISTERED = new AtomicBoolean(false); - private static volatile CommandServer commandServer; + // The active transport worker: a CommandServer (unix, local/docker) or a + // CoverageTcpClient (TCP, k8s). Both are Closeable so stop() is transport-agnostic. + private static volatile Closeable coverageWorker; private KeployDedupAgent() { } @@ -109,10 +113,44 @@ public static boolean start() { CoverageCollector collector = new CoverageCollector( new JacocoClient(resolveHost(), resolvePort()), new CoverageIndex()); - CommandServer server = new CommandServer(collector, new CoveragePublisher(new File(DATA_SOCKET_PATH))); - Thread thread = new Thread(server, "keploy-java-dedup-control"); + + Runnable worker; + String threadName; + String endpoint = resolveEndpoint(); + if (endpoint != null) { + // k8s: the collector lives in a different pod, so there is no shared + // /tmp. Dial the collector's TCP endpoint and run the same protocol. + int idx = endpoint.lastIndexOf(':'); + if (idx <= 0 || idx == endpoint.length() - 1) { + STARTED.set(false); + log(Level.SEVERE, "Invalid KEPLOY_COVERAGE_ENDPOINT '" + endpoint + "', expected host:port", null); + return false; + } + String host = endpoint.substring(0, idx); + int port; + try { + port = Integer.parseInt(endpoint.substring(idx + 1).trim()); + } catch (NumberFormatException e) { + STARTED.set(false); + log(Level.SEVERE, "Invalid port in KEPLOY_COVERAGE_ENDPOINT '" + endpoint + "'", e); + return false; + } + CoverageTcpClient client = new CoverageTcpClient(collector, host, port); + worker = client; + coverageWorker = client; + threadName = "keploy-java-dedup-tcp"; + log(Level.INFO, "Keploy dedup: TCP transport enabled, will dial collector at " + host + ":" + port, null); + } else { + // local/docker: SDK owns the unix control socket and pushes coverage + // back over the unix data socket on a pod-shared /tmp. + CommandServer server = new CommandServer(collector, new CoveragePublisher(new File(DATA_SOCKET_PATH))); + worker = server; + coverageWorker = server; + threadName = "keploy-java-dedup-control"; + } + + Thread thread = new Thread(worker, threadName); thread.setDaemon(true); - commandServer = server; thread.start(); registerShutdownHook(); return true; @@ -131,11 +169,15 @@ public static boolean isStarted() { * Stops the background control socket listener. */ public static void stop() { - CommandServer server = commandServer; - if (server != null) { - server.close(); + Closeable worker = coverageWorker; + if (worker != null) { + try { + worker.close(); + } catch (IOException e) { + log(Level.FINE, "Failed to close Java dedup coverage worker", e); + } } - commandServer = null; + coverageWorker = null; STARTED.set(false); } @@ -190,6 +232,16 @@ private static int resolvePort() { } } + /** + * Returns the collector's TCP endpoint ("host:port") for k8s mode, or {@code null} + * to use the unix-socket transport (local/docker). The collector advertises this + * to the SDK via KEPLOY_COVERAGE_ENDPOINT when app and collector are in different pods. + */ + private static String resolveEndpoint() { + String value = envOrProperty("KEPLOY_COVERAGE_ENDPOINT", "keploy.coverage.endpoint", ""); + return value.trim().isEmpty() ? null : value.trim(); + } + private static String envOrProperty(String envKey, String propertyKey, String defaultValue) { String value = System.getenv(envKey); if (value == null || value.trim().isEmpty()) { @@ -205,6 +257,141 @@ private static String normalizePath(String path) { return path.replace(File.separatorChar, '/'); } + // Build-constant coverage metadata for one class, serialized into the + // manifest the offline CoverageReporter consumes. id = CRC64 class id + // (unsigned-hex), probeCount = JaCoCo probe array length. + private static final class ManifestEntry { + private String id; + private int probeCount; + } + + // bytecodeAlreadyStored HEAD-checks whether k8s-proxy already holds the + // bytecode blob for this build tag, so each ephemeral replay pod uploads + // at most once per build. Best-effort: any error => treat as absent and + // attempt the upload (the server dedupes idempotently by buildTag anyway). + private static boolean bytecodeAlreadyStored(String baseUrl, String buildTag) { + String target = baseUrl + (baseUrl.contains("?") ? "&" : "?") + "buildTag=" + urlEncode(buildTag); + try { + log(Level.INFO, "Keploy dedup: exists-check HEAD " + target, null); + URL u = new URL(target); + HttpURLConnection conn = (HttpURLConnection) u.openConnection(); + relaxTlsIfHttps(conn); + conn.setRequestMethod("HEAD"); + conn.setConnectTimeout(SOCKET_TIMEOUT_MILLIS); + conn.setReadTimeout(SOCKET_TIMEOUT_MILLIS); + int code = conn.getResponseCode(); + conn.disconnect(); + log(Level.INFO, "Keploy dedup: exists-check HEAD -> HTTP " + code, null); + return code == HttpURLConnection.HTTP_OK; + } catch (Exception e) { + log(Level.WARNING, "Keploy dedup: exists-check HEAD failed (treating as absent): " + + e.getClass().getName() + ": " + e.getMessage(), null); + return false; + } + } + + // postBytecode uploads the manifest JSON + classes zip as a multipart form + // to k8s-proxy's bytecode endpoint, tagged with the build tag. + private static void postBytecode(String baseUrl, String buildTag, String manifestJson, byte[] zipBytes) + throws IOException { + String boundary = "keployBytecodeBoundary" + Integer.toHexString(System.identityHashCode(zipBytes)); + String target = baseUrl + (baseUrl.contains("?") ? "&" : "?") + "buildTag=" + urlEncode(buildTag); + log(Level.INFO, "Keploy dedup: POST bytecode -> " + target + + " (manifest=" + manifestJson.length() + "B, zip=" + zipBytes.length + "B)", null); + URL u = new URL(target); + HttpURLConnection conn = (HttpURLConnection) u.openConnection(); + relaxTlsIfHttps(conn); + conn.setDoOutput(true); + conn.setRequestMethod("POST"); + conn.setConnectTimeout(SOCKET_TIMEOUT_MILLIS); + conn.setReadTimeout(30000); + conn.setRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundary); + try (OutputStream out = conn.getOutputStream()) { + log(Level.INFO, "Keploy dedup: POST bytecode connection open; streaming body", null); + writeAscii(out, "--" + boundary + "\r\n"); + writeAscii(out, "Content-Disposition: form-data; name=\"manifest\"; filename=\"manifest.json\"\r\n"); + writeAscii(out, "Content-Type: application/json\r\n\r\n"); + out.write(manifestJson.getBytes(StandardCharsets.UTF_8)); + writeAscii(out, "\r\n--" + boundary + "\r\n"); + writeAscii(out, "Content-Disposition: form-data; name=\"classes\"; filename=\"classes.zip\"\r\n"); + writeAscii(out, "Content-Type: application/zip\r\n\r\n"); + out.write(zipBytes); + writeAscii(out, "\r\n--" + boundary + "--\r\n"); + out.flush(); + } + int code = conn.getResponseCode(); + String body = readStream(code >= 400 ? conn.getErrorStream() : conn.getInputStream()); + conn.disconnect(); + log(Level.INFO, "Keploy dedup: POST bytecode -> HTTP " + code + + (body.isEmpty() ? "" : (" body=" + body)), null); + if (code / 100 != 2) { + throw new IOException("bytecode upload returned HTTP " + code + (body.isEmpty() ? "" : (": " + body))); + } + } + + private static String readStream(InputStream in) { + if (in == null) { + return ""; + } + try { + byte[] b = readAllBytes(in); + String s = new String(b, StandardCharsets.UTF_8).trim(); + return s.length() > 300 ? s.substring(0, 300) : s; + } catch (Exception e) { + return ""; + } + } + + private static void writeAscii(OutputStream out, String s) throws IOException { + out.write(s.getBytes(StandardCharsets.US_ASCII)); + } + + // relaxTlsIfHttps makes the bytecode upload tolerate k8s-proxy's self-signed + // in-cluster cert. The upload is a best-effort, cluster-internal data-plane + // call (like the raw-TCP coverage collector), so trust-all here is acceptable + // and avoids depending on the app JVM's truststore chaining to the proxy CA. + private static volatile javax.net.ssl.SSLSocketFactory trustAllFactory; + + private static void relaxTlsIfHttps(HttpURLConnection conn) { + if (!(conn instanceof javax.net.ssl.HttpsURLConnection)) { + return; + } + try { + if (trustAllFactory == null) { + javax.net.ssl.SSLContext ctx = javax.net.ssl.SSLContext.getInstance("TLS"); + ctx.init(null, new javax.net.ssl.TrustManager[]{new javax.net.ssl.X509TrustManager() { + @Override + public void checkClientTrusted(java.security.cert.X509Certificate[] c, String a) { + } + + @Override + public void checkServerTrusted(java.security.cert.X509Certificate[] c, String a) { + } + + @Override + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return new java.security.cert.X509Certificate[0]; + } + }}, null); + trustAllFactory = ctx.getSocketFactory(); + } + javax.net.ssl.HttpsURLConnection https = (javax.net.ssl.HttpsURLConnection) conn; + https.setSSLSocketFactory(trustAllFactory); + https.setHostnameVerifier((hostname, session) -> true); + log(Level.INFO, "Keploy dedup: relaxed TLS (trust-all) for bytecode upload", null); + } catch (Exception e) { + log(Level.WARNING, "Keploy dedup: failed to relax TLS: " + e.getMessage(), null); + } + } + + private static String urlEncode(String s) { + try { + return URLEncoder.encode(s, StandardCharsets.UTF_8.name()); + } catch (Exception e) { + return s; + } + } + private static void deleteSocketFile(File file) { if (file.exists() && !file.delete()) { log(Level.FINE, "Failed to delete socket file " + file.getAbsolutePath(), null); @@ -306,6 +493,9 @@ private void dispatch(CoverageCommand command, OutputStream outputStream) { synchronized (testCaseLock) { if (command.action == CommandAction.START) { activeTestId = command.testId; + // Warm up app classes once before the first measured window so + // one-time lines aren't charged to the first test. + collector.warmup(); collector.reset(); writeAck(outputStream); return; @@ -361,6 +551,161 @@ public void close() { } } + /** + * TCP transport (k8s): the SDK dials the collector and keeps one bidirectional + * connection open for the whole replay. Mirrors {@link CommandServer}'s dispatch + * but inverts the roles — here the SDK is the client. Wire protocol: + *
+     *   collector -> SDK : "START <id>" | "END <id>"
+     *   SDK -> collector : "ACK"                        (after START reset)
+     *                      "COV <compact-json>" + "ACK" (after END dump)
+     * 
+ * The collector starts listening only when replay begins, so the connect loop + * retries until it is reachable. + */ + private static final class CoverageTcpClient implements Runnable, Closeable { + + private static final long RECONNECT_DELAY_MILLIS = 1000; + + private final CoverageCollector collector; + private final String host; + private final int port; + private final AtomicBoolean running = new AtomicBoolean(true); + private final Object testCaseLock = new Object(); + private volatile Socket socket; + private String activeTestId = ""; + // Connect retries spin ~1/s until the collector is reachable; log the first + // failure at INFO and the rest at FINE so we don't spam k8s app logs. + private boolean connectFailureLogged = false; + + CoverageTcpClient(CoverageCollector collector, String host, int port) { + this.collector = collector; + this.host = host; + this.port = port; + } + + @Override + public void run() { + while (running.get()) { + try { + connectAndServe(); + } catch (IOException e) { + if (running.get()) { + Level level = connectFailureLogged ? Level.FINE : Level.INFO; + connectFailureLogged = true; + log(level, "Keploy dedup: TCP connect to " + host + ":" + port + + " failed (" + e.getClass().getSimpleName() + ": " + e.getMessage() + "), retrying", null); + } + } + if (running.get()) { + try { + Thread.sleep(RECONNECT_DELAY_MILLIS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + } + } + + private void connectAndServe() throws IOException { + Socket open = new Socket(); + // Bounded connect, but NO read timeout: the connection is long-lived and + // idles between tests waiting for the next START/END command. + open.connect(new InetSocketAddress(InetAddress.getByName(host), port), SOCKET_TIMEOUT_MILLIS); + socket = open; + connectFailureLogged = false; + log(Level.INFO, "Keploy dedup: connected to collector at " + host + ":" + port, null); + try (Socket active = open; + BufferedReader reader = new BufferedReader( + new InputStreamReader(active.getInputStream(), StandardCharsets.UTF_8))) { + OutputStream out = active.getOutputStream(); + String line; + while (running.get() && (line = reader.readLine()) != null) { + String trimmed = line.trim(); + if (trimmed.isEmpty()) { + continue; + } + CoverageCommand command = CoverageCommand.parse(trimmed); + if (command == null) { + continue; + } + dispatch(command, out); + } + } finally { + socket = null; + } + } + + private void dispatch(CoverageCommand command, OutputStream out) throws IOException { + synchronized (testCaseLock) { + if (command.action == CommandAction.START) { + activeTestId = command.testId; + // Warm up app classes once before the first measured window so + // one-time lines aren't charged to the first test. + collector.warmup(); + collector.reset(); + writeLine(out, "ACK"); + return; + } + + if (command.action == CommandAction.END) { + if (!command.testId.equals(activeTestId)) { + log(Level.SEVERE, + "Ignoring mismatched END command. expected=" + activeTestId + ", actual=" + + command.testId, + null); + writeLine(out, "ACK"); + return; + } + + try { + Map> executedProbesByClass = collector.capture(); + if (executedProbesByClass.isEmpty()) { + log(Level.FINE, "No Java coverage collected for " + command.testId, null); + } + // Always emit COV before ACK — even when empty — so the + // line-oriented collector reads exactly one COV per END (the + // unix transport likewise always publishes). The payload is + // recorded before the ACK releases the caller. + writeLine(out, "COV " + GSON.toJson( + new DedupPayload(command.testId, executedProbesByClass))); + // Ride the bytecode+manifest over this same raw-TCP channel + // (the only app->proxy path the replay agent doesn't mock- + // intercept), when the manifest has grown. + String classesFrame = collector.pollBytecodeFrame(); + if (classesFrame != null) { + writeLine(out, classesFrame); + } + } catch (Exception e) { + log(Level.SEVERE, "Failed to collect Java coverage for " + command.testId, e); + } finally { + activeTestId = ""; + writeLine(out, "ACK"); + } + } + } + } + + private void writeLine(OutputStream out, String message) throws IOException { + out.write((message + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + } + + @Override + public void close() { + running.set(false); + Socket open = socket; + if (open != null) { + try { + open.close(); + } catch (IOException e) { + log(Level.FINE, "Failed to close Java dedup TCP socket", e); + } + } + } + } + private enum CommandAction { START, END @@ -399,6 +744,18 @@ private static final class CoverageCollector { private final JacocoClient jacocoClient; private final CoverageIndex coverageIndex; + private final AtomicBoolean warmed = new AtomicBoolean(false); + // Coverage manifest ({vmClassName -> {classId, probeCount}}) accumulated + // from the SAME per-test capture() path that produces dedup fingerprints — + // so its class matching is proven correct (a separate post-warmup dump + // raced the START reset and matched 0 classes). Build-constant, grows as + // new app classes are hit. Uploaded (overwrite) whenever it grows, so the + // final upload covers every class in the coverage union. + private final java.util.concurrent.ConcurrentMap liveManifest = + new java.util.concurrent.ConcurrentHashMap<>(); + private final java.util.concurrent.atomic.AtomicInteger lastUploadedManifestSize = + new java.util.concurrent.atomic.AtomicInteger(-1); + private final AtomicBoolean uploadInFlight = new AtomicBoolean(false); private CoverageCollector(JacocoClient jacocoClient, CoverageIndex coverageIndex) { this.jacocoClient = jacocoClient; @@ -413,6 +770,73 @@ private void reset() { } } + /** + * Eagerly initialize every indexed application class so their static + * initializers (<clinit>) run ONCE here, before the first test's + * coverage window. The very first request to a fresh JVM otherwise pays + * the one-time class-init cost, and JaCoCo charges those <clinit> + * lines to whichever test ran first — making the duplicate set + * non-deterministic run-to-run. Running them now (then letting the START + * reset clear the counters) means every test sees only the lines its own + * request executes. Called once, on the first START, when the app is + * fully started. Best-effort: per-class failures are ignored (the class + * just falls back to lazy init). Disable with + * KEPLOY_JAVA_DEDUP_WARMUP_DISABLED=true if a static initializer has + * harmful side effects. + */ + private void warmup() { + if (isWarmupDisabled() || !warmed.compareAndSet(false, true)) { + return; + } + ClassLoader[] loaders = warmupLoaders(); + int initialized = 0; + int failed = 0; + for (ClassEntry entry : coverageIndex.entries()) { + if (initializeClass(entry.className.replace('/', '.'), loaders)) { + initialized++; + } else { + failed++; + } + } + log(Level.INFO, "Keploy dedup: warmed up application classes (initialized=" + + initialized + ", skipped=" + failed + ")", null); + } + + private boolean isWarmupDisabled() { + return isTruthy(envOrProperty("KEPLOY_JAVA_DEDUP_WARMUP_DISABLED", + "keploy.java.dedup.warmup.disabled", "")); + } + + private ClassLoader[] warmupLoaders() { + List loaders = new ArrayList<>(3); + ClassLoader ctx = Thread.currentThread().getContextClassLoader(); + if (ctx != null) { + loaders.add(ctx); + } + ClassLoader sys = ClassLoader.getSystemClassLoader(); + if (sys != null && !loaders.contains(sys)) { + loaders.add(sys); + } + ClassLoader own = KeployDedupAgent.class.getClassLoader(); + if (own != null && !loaders.contains(own)) { + loaders.add(own); + } + return loaders.toArray(new ClassLoader[0]); + } + + private boolean initializeClass(String binaryName, ClassLoader[] loaders) { + for (ClassLoader loader : loaders) { + try { + Class.forName(binaryName, true, loader); + return true; + } catch (Throwable ignored) { + // Try the next loader; a class that no loader can initialize + // (or whose throws) is simply left to lazy init. + } + } + return false; + } + private Map> capture() throws IOException { byte[] dump = jacocoClient.dump(true, true); if (dump.length == 0) { @@ -425,73 +849,70 @@ private Map> capture() throws IOException { ExecFileLoader loader = new ExecFileLoader(); loader.load(new ByteArrayInputStream(dump)); ExecutionDataStore executionDataStore = loader.getExecutionDataStore(); - Set hitClasses = hitClassNames(executionDataStore); - if (hitClasses.isEmpty()) { + + // BRANCH coverage: fingerprint by the set of executed JaCoCo PROBES per + // class, NOT just executed lines. Each branch is instrumented as a + // distinct probe, so the executed-probe set distinguishes WHICH branch a + // test took (true vs false) — line status, and even branch counts, report + // identically for the true-path and false-path test. The probe set also + // subsumes line coverage (lines map to probes). Probe indices are stable + // for a given class bytecode, so the set is comparable across the run. + // Keyed by VM class name (canonical "com/foo/Bar"; no file-path + // normalization needed). CoverageIndex is used only to restrict to the + // app's own classes so JDK/library probes don't pollute the fingerprint. + Set appClasses = indexedClassNames(); + if (appClasses.isEmpty()) { if (diagnosticsEnabled()) { - diagnostic("execution data had no hit classes"); + diagnostic("coverage index has no app classes"); } return Collections.emptyMap(); } - List indexedEntries = coverageIndex.entries(); - if (diagnosticsEnabled()) { - diagnostic("hitClasses=" + hitClasses.size() - + ", indexedEntries=" + indexedEntries.size() - + ", sampleHits=" + summarizeStrings(hitClasses, 5) - + ", sampleEntries=" + summarizeClassEntries(indexedEntries, 5)); - } - - CoverageBuilder coverageBuilder = new CoverageBuilder(); - Analyzer analyzer = new Analyzer(executionDataStore, coverageBuilder); - for (ClassEntry classEntry : indexedEntries) { - if (!hitClasses.contains(classEntry.className)) { - continue; - } - try { - analyzer.analyzeClass(classEntry.bytes, classEntry.location); - } catch (IOException | RuntimeException e) { - log(Level.FINE, "Skipping unreadable Java class " + classEntry.location, e); - } - } - - if (diagnosticsEnabled()) { - diagnostic("analyzedClasses=" + coverageBuilder.getClasses().size()); - } - Map> raw = new LinkedHashMap<>(); - for (IClassCoverage classCoverage : coverageBuilder.getClasses()) { - if (!classCoverage.containsCode()) { + for (ExecutionData executionData : executionDataStore.getContents()) { + if (!executionData.hasHits()) { continue; } - - List executedLines = executedLines(classCoverage); - if (executedLines.isEmpty()) { + if (!appClasses.contains(executionData.getName())) { continue; } - - String sourcePath = resolveSourcePath(classCoverage); - Set merged = raw.get(sourcePath); - if (merged == null) { - merged = new LinkedHashSet<>(); - raw.put(sourcePath, merged); + boolean[] probes = executionData.getProbes(); + // Record this class's build-constant (classId, probeCount) for the + // coverage manifest. Same matching as the fingerprint below, so the + // manifest can never miss a class that appears in the union. + final int probeCount = probes.length; + final long classId = executionData.getId(); + liveManifest.computeIfAbsent(executionData.getName(), k -> { + ManifestEntry m = new ManifestEntry(); + m.id = Long.toHexString(classId); + m.probeCount = probeCount; + return m; + }); + Set fired = new LinkedHashSet<>(); + for (int i = 0; i < probes.length; i++) { + if (probes[i]) { + fired.add(i); + } + } + if (!fired.isEmpty()) { + raw.put(executionData.getName(), fired); } - merged.addAll(executedLines); } if (diagnosticsEnabled()) { - diagnostic("filesWithCoverage=" + raw.size() - + ", sampleFiles=" + summarizeStrings(raw.keySet(), 5)); + diagnostic("classesWithProbes=" + raw.size() + + ", sampleClasses=" + summarizeStrings(raw.keySet(), 5)); } return toSortedMap(raw); } - private Set hitClassNames(ExecutionDataStore executionDataStore) { + // indexedClassNames returns the VM names of the app's own classes (from the + // CoverageIndex) so probe collection can skip JDK/library classes. + private Set indexedClassNames() { Set names = new LinkedHashSet<>(); - for (ExecutionData executionData : executionDataStore.getContents()) { - if (executionData.hasHits()) { - names.add(executionData.getName()); - } + for (ClassEntry entry : coverageIndex.entries()) { + names.add(entry.className); } return names; } @@ -507,51 +928,6 @@ private String summarizeStrings(Iterable values, int limit) { return sample.toString(); } - private String summarizeClassEntries(List values, int limit) { - List sample = new ArrayList<>(); - for (ClassEntry value : values) { - sample.add(value.className); - if (sample.size() >= limit) { - break; - } - } - return sample.toString(); - } - - private List executedLines(IClassCoverage classCoverage) { - int firstLine = classCoverage.getFirstLine(); - int lastLine = classCoverage.getLastLine(); - if (firstLine < 0 || lastLine < firstLine) { - return Collections.emptyList(); - } - - List lines = new ArrayList<>(); - for (int line = firstLine; line <= lastLine; line++) { - int status = classCoverage.getLine(line).getStatus(); - if (status != ICounter.EMPTY && status != ICounter.NOT_COVERED) { - lines.add(line); - } - } - return lines; - } - - private String resolveSourcePath(IClassCoverage classCoverage) { - String sourceFile = classCoverage.getSourceFileName(); - if (sourceFile == null || sourceFile.trim().isEmpty()) { - return normalizePath(classCoverage.getName() + ".java"); - } - - String packageName = classCoverage.getPackageName(); - String relativePath = packageName == null || packageName.isEmpty() - ? sourceFile - : packageName + "/" + sourceFile; - File localSource = new File(System.getProperty("user.dir"), "src/main/java/" + relativePath); - if (localSource.exists()) { - return normalizePath(localSource.getAbsolutePath()); - } - return normalizePath(relativePath); - } - private Map> toSortedMap(Map> raw) { List files = new ArrayList<>(raw.keySet()); Collections.sort(files); @@ -564,6 +940,59 @@ private Map> toSortedMap(Map> raw) { } return sorted; } + + // pollBytecodeFrame returns a "CLASSES " + // line to send over the collector channel (:36340) IF the coverage manifest + // has grown since the last send, else null. The raw-TCP collector channel + // is the ONLY app->proxy path that survives the replay agent's mock + // interception (an HTTPS upload gets 502'd as an unmatched outbound mock), + // so the bytecode rides it alongside the COV frames. buildTag/manifest/zip + // are base64'd to keep the whole frame on one line. Called after each + // capture; a no-op until the manifest grows or when KEPLOY_BUILD_TAG unset. + String pollBytecodeFrame() { + final String buildTag = envOrProperty("KEPLOY_BUILD_TAG", "keploy.build.tag", ""); + if (buildTag.isEmpty()) { + return null; + } + final int size = liveManifest.size(); + if (size == 0 || size <= lastUploadedManifestSize.get()) { + return null; // nothing new since the last send + } + try { + Map snapshot = new LinkedHashMap<>(liveManifest); + byte[] zip = zipIndexedClasses(); + String manifestJson = GSON.toJson(snapshot); + java.util.Base64.Encoder enc = java.util.Base64.getEncoder(); + String frame = "CLASSES " + + enc.encodeToString(buildTag.getBytes(StandardCharsets.UTF_8)) + " " + + enc.encodeToString(manifestJson.getBytes(StandardCharsets.UTF_8)) + " " + + enc.encodeToString(zip); + lastUploadedManifestSize.set(size); + log(Level.INFO, "Keploy dedup: sending CLASSES frame over collector channel (classes=" + + size + ", zipBytes=" + zip.length + ")", null); + return frame; + } catch (Throwable e) { + log(Level.WARNING, "Keploy dedup: failed to build CLASSES frame: " + e.getMessage(), null); + return null; + } + } + + // zipIndexedClasses packs every indexed app .class (already in memory as + // ClassEntry.bytes) into a zip keyed by ".class". + private byte[] zipIndexedClasses() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(256 * 1024); + int n = 0; + try (ZipOutputStream zos = new ZipOutputStream(baos)) { + for (ClassEntry entry : coverageIndex.entries()) { + zos.putNextEntry(new ZipEntry(entry.className + ".class")); + zos.write(entry.bytes); + zos.closeEntry(); + n++; + } + } + log(Level.INFO, "Keploy dedup: zipIndexedClasses entries=" + n + " bytes=" + baos.size(), null); + return baos.toByteArray(); + } } private static final class JacocoClient { diff --git a/scripts/smoke-javaagent.sh b/scripts/smoke-javaagent.sh index 39058ee..29af495 100755 --- a/scripts/smoke-javaagent.sh +++ b/scripts/smoke-javaagent.sh @@ -108,9 +108,11 @@ public final class SmokeHarness { } String json = payload.get(); + // The fingerprint is keyed by VM class name (probe-based coverage), + // e.g. "smoke/Work" — not a source file like "Work.java". if (json == null || !json.contains("\"id\":\"test-set-0/" + mode + "\"") - || !json.contains("Work.java")) { + || !json.contains("smoke/Work")) { throw new IllegalStateException("unexpected coverage payload for " + mode + ": " + json); } System.out.println(mode + ": " + json);