iperf-watcher/src/main.zig
2023-10-14 21:19:20 -05:00

230 lines
7.2 KiB
Zig

const std = @import("std");
const datetime = @import("datetime");
const iperf3 = @import("iperf3.zig");
const loki = @import("loki.zig");
pub const std_options = struct {
pub const log_level = .debug;
pub const logFn = myLogFn;
};
pub fn myLogFn(
comptime level: std.log.Level,
comptime scope: @TypeOf(.EnumLiteral),
comptime format: []const u8,
args: anytype,
) void {
// const scope_prefix = "(" ++ switch (scope) {
// .iperf3, std.log.default_log_scope => @tagName(scope),
// else => if (@intFromEnum(level) <= @intFromEnum(std.log.Level.debug))
// @tagName(scope)
// else
// return,
// } ++ "): ";
const scope_prefix = "(" ++ @tagName(scope) ++ "): ";
const prefix = " [" ++ comptime level.asText() ++ "] " ++ scope_prefix;
// Print the message to stderr, silently ignoring any errors
std.debug.getStderrMutex().lock();
defer std.debug.getStderrMutex().unlock();
const stderr = std.io.getStdErr().writer();
const now = datetime.Instant.now().asDateTime();
nosuspend now.format("YYYY-MM-DDTHH:mm:ss.SSSSSSSSS", .{}, stderr) catch return;
nosuspend stderr.writeAll(prefix) catch return;
nosuspend stderr.print(format, args) catch return;
if (format[format.len - 1] != '\n') nosuspend stderr.writeAll("\n") catch return;
}
const Config = struct {
loki: struct {
url: []const u8,
username: []const u8,
password: ?[]const u8 = null,
password_file: ?[]const u8 = null,
},
iperf3: struct {
path: ?[]const u8 = null,
port: ?u16 = null,
},
pub fn deinit(self: @This(), allocator: std.mem.Allocator) void {
allocator.free(self.loki.url);
allocator.free(self.loki.username);
if (self.loki.password) |password| allocator.free(password);
if (self.loki.password_file) |password_file| allocator.free(password_file);
if (self.iperf3.path) |path| allocator.free(path);
}
};
pub fn ConfigWrapper(comptime T: type) type {
return struct {
arena: *std.heap.ArenaAllocator,
value: T,
pub fn deinit(self: @This()) void {
const allocator = self.arena.child_allocator;
self.arena.deinit();
allocator.destroy(self.arena);
}
};
}
fn readConfig(allocator: std.mem.Allocator, path: []const u8) !ConfigWrapper(Config) {
var config = ConfigWrapper(Config){
.arena = try allocator.create(std.heap.ArenaAllocator),
.value = undefined,
};
config.arena.child_allocator = allocator;
const data = try std.fs.cwd().readFileAlloc(config.arena.child_allocator, path, 1024);
config.value = try std.json.parseFromSliceLeaky(Config, config.arena.child_allocator, data, .{});
return config;
}
pub fn main() !void {
const iperf3_log = std.log.scoped(.iperf3);
const hostname = blk: {
var buffer: [std.os.HOST_NAME_MAX]u8 = undefined;
const name = try std.os.gethostname(&buffer);
iperf3_log.debug("{s}", .{name});
break :blk name;
};
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
const config = try readConfig(allocator, "config.json");
const b64 = std.base64.standard.Encoder;
var auth_buf: [256]u8 = undefined;
const auth = try std.fmt.bufPrint(
&auth_buf,
"{s}:{s}",
.{
config.value.loki.username,
config.value.loki.password.?,
},
);
var auth_encoded_buf: [b64.calcSize(auth_buf.len)]u8 = undefined;
var auth_encoded = b64.encode(&auth_encoded_buf, auth);
var auth_header_buf: [256]u8 = undefined;
var auth_header = try std.fmt.bufPrint(&auth_header_buf, "Basic {s}", .{auth_encoded});
const uri = try std.Uri.parse(config.value.loki.url);
// var headers = std.http.Headers{ .allocator = allocator };
// try headers.append("Authorization", auth_header);
// try headers.append("Content-Type", "application/json");
var client = std.http.Client{ .allocator = allocator };
defer client.deinit();
// const stderr_file = std.io.getStdErr().writer();
// var stderr_bw = std.io.bufferedWriter(stderr_file);
// const stderr = stderr_bw.writer();
// _ = stderr;
var port_buf: [16]u8 = undefined;
var port: []u8 = undefined;
if (config.value.iperf3.port) |p| {
port = try std.fmt.bufPrint(&port_buf, "{d}", .{p});
} else {
port = try std.fmt.bufPrint(&port_buf, "{d}", .{5201});
}
while (true) {
iperf3_log.info("waiting for connection", .{});
// try stderr_bw.flush();
var c = std.process.Child.init(
&[_][]const u8{
if (config.value.iperf3.path) |path| path else "iperf3",
"--server",
"--port",
port,
"--json",
"--one-off",
},
allocator,
);
c.stdin_behavior = .Ignore;
c.stdout_behavior = .Pipe;
c.stderr_behavior = .Ignore;
try c.spawn();
var reader = c.stdout.?.reader();
var token_reader = std.json.reader(allocator, reader);
var obj = try std.json.parseFromTokenSource(
iperf3.IPerfReturn,
allocator,
&token_reader,
.{},
);
defer obj.deinit();
var line = std.ArrayList(u8).init(allocator);
try std.json.stringify(
obj.value,
.{
.emit_null_optional_fields = false,
},
line.writer(),
);
defer line.deinit();
const streams = loki.LokiStreams{
.streams = &[_]loki.LokiStream{
.{
.stream = .{
.job = "iperf3",
.server = hostname,
.level = if (obj.value.@"error" == null) .info else .@"error",
},
.values = &[_]loki.LokiValue{
.{
.ts = std.time.nanoTimestamp(),
.line = line.items,
},
},
},
},
};
var data = std.ArrayList(u8).init(allocator);
try std.json.stringify(
streams,
.{
.emit_null_optional_fields = false,
},
data.writer(),
);
defer data.deinit();
// try stderr.writeAll(data.items);
// try stderr.writeAll("\n");
// try stderr_bw.flush();
var content_length_buf: [16]u8 = undefined;
var content_length = try std.fmt.bufPrint(
&content_length_buf,
"{d}",
.{data.items.len},
);
var headers = std.http.Headers{ .allocator = allocator };
try headers.append("Authorization", auth_header);
try headers.append("Accept", "application/json");
try headers.append("Content-Type", "application/json");
try headers.append("Content-Length", content_length);
var req = try client.request(.POST, uri, headers, .{});
try req.start();
try req.writeAll(data.items);
try req.finish();
try req.wait();
iperf3_log.info("{}\n", .{req.response.status});
}
}